You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2013/12/21 03:26:20 UTC

[1/4] remove 0MQ and replace with netty

Updated Branches:
  refs/heads/master d3414f576 -> d79f6b808


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
deleted file mode 100644
index f2b9329..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import java.net.ConnectException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StormClientHandler extends SimpleChannelUpstreamHandler  {
-    private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
-    private Client client;
-    private AtomicBoolean being_closed;
-    long start_time; 
-    
-    StormClientHandler(Client client) {
-        this.client = client;
-        being_closed = new AtomicBoolean(false);
-        start_time = System.currentTimeMillis();
-    }
-
-    @Override
-    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) {
-        //register the newly established channel
-        Channel channel = event.getChannel();
-        client.setChannel(channel);
-        LOG.debug("connection established to a remote host");
-        
-        //send next request
-        try {
-            sendRequests(channel, client.takeMessages());
-        } catch (InterruptedException e) {
-            channel.close();
-        }
-    }
-
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
-        LOG.debug("send/recv time (ms): {}", (System.currentTimeMillis() - start_time));
-        
-        //examine the response message from server
-        ControlMessage msg = (ControlMessage)event.getMessage();
-        if (msg==ControlMessage.FAILURE_RESPONSE)
-            LOG.info("failure response:{}", msg);
-
-        //send next request
-        Channel channel = event.getChannel();
-        try {
-            sendRequests(channel, client.takeMessages());
-        } catch (InterruptedException e) {
-            channel.close();
-        }
-    }
-
-    /**
-     * Retrieve a request from message queue, and send to server
-     * @param channel
-     */
-    private void sendRequests(Channel channel, final MessageBatch requests) {
-        if (requests==null || requests.size()==0 || being_closed.get()) return;
-
-        //if task==CLOSE_MESSAGE for our last request, the channel is to be closed
-        Object last_msg = requests.get(requests.size()-1);
-        if (last_msg==ControlMessage.CLOSE_MESSAGE) {
-            being_closed.set(true);
-            requests.remove(last_msg);
-        }
-
-        //we may don't need do anything if no requests found
-        if (requests.isEmpty()) {
-            if (being_closed.get())
-                client.close_n_release();
-            return;
-        }
-
-        //write request into socket channel
-        ChannelFuture future = channel.write(requests);
-        future.addListener(new ChannelFutureListener() {
-            public void operationComplete(ChannelFuture future)
-                    throws Exception {
-                if (!future.isSuccess()) {
-                    LOG.info("failed to send requests:", future.getCause());
-                    future.getChannel().close();
-                } else {
-                    LOG.debug("{} request(s) sent", requests.size());
-                }
-                if (being_closed.get())
-                    client.close_n_release();
-            }
-        });
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
-        Throwable cause = event.getCause();
-        if (!(cause instanceof ConnectException)) {
-            LOG.info("Connection failed:", cause);
-        } 
-        if (!being_closed.get()) {
-            client.setChannel(null);
-            client.reconnect();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
deleted file mode 100644
index 91c513a..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-
-class StormClientPipelineFactory implements ChannelPipelineFactory {
-    private Client client;
-
-    StormClientPipelineFactory(Client client) {
-        this.client = client;        
-    }
-
-    public ChannelPipeline getPipeline() throws Exception {
-        // Create a default pipeline implementation.
-        ChannelPipeline pipeline = Channels.pipeline();
-
-        // Decoder
-        pipeline.addLast("decoder", new MessageDecoder());
-        // Encoder
-        pipeline.addLast("encoder", new MessageEncoder());
-        // business logic.
-        pipeline.addLast("handler", new StormClientHandler(client));
-
-        return pipeline;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
deleted file mode 100644
index fd21834..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.messaging.TaskMessage;
-
-class StormServerHandler extends SimpleChannelUpstreamHandler  {
-    private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
-    Server server;
-    private AtomicInteger failure_count; 
-    
-    StormServerHandler(Server server) {
-        this.server = server;
-        failure_count = new AtomicInteger(0);
-    }
-    
-    @Override
-    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
-        server.addChannel(e.getChannel());
-    }
-    
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-        Object msg = e.getMessage();  
-        if (msg == null) return;
-
-        //end of batch?
-        if (msg==ControlMessage.EOB_MESSAGE) {
-            Channel channel = ctx.getChannel();
-            LOG.debug("Send back response ...");
-            if (failure_count.get()==0)
-                channel.write(ControlMessage.OK_RESPONSE);
-            else channel.write(ControlMessage.FAILURE_RESPONSE);
-            return;
-        }
-        
-        //enqueue the received message for processing
-        try {
-            server.enqueue((TaskMessage)msg);
-        } catch (InterruptedException e1) {
-            LOG.info("failed to enqueue a request message", e);
-            failure_count.incrementAndGet();
-        }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-        server.closeChannel(e.getChannel());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
deleted file mode 100644
index 56b0834..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-
-
-class StormServerPipelineFactory implements  ChannelPipelineFactory {
-    private Server server;
-    
-    StormServerPipelineFactory(Server server) {
-        this.server = server;        
-    }
-    
-    public ChannelPipeline getPipeline() throws Exception {
-        // Create a default pipeline implementation.
-        ChannelPipeline pipeline = Channels.pipeline();
-
-        // Decoder
-        pipeline.addLast("decoder", new MessageDecoder());
-        // Encoder
-        pipeline.addLast("encoder", new MessageEncoder());
-        // business logic.
-        pipeline.addLast("handler", new StormServerHandler(server));
-
-        return pipeline;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/test/clj/backtype/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-netty/test/clj/backtype/storm/messaging/netty_integration_test.clj b/storm-netty/test/clj/backtype/storm/messaging/netty_integration_test.clj
deleted file mode 100644
index eefcb48..0000000
--- a/storm-netty/test/clj/backtype/storm/messaging/netty_integration_test.clj
+++ /dev/null
@@ -1,44 +0,0 @@
-(ns backtype.storm.messaging.netty-integration-test
-  (:use [clojure test])
-  (:import [backtype.storm.messaging TransportFactory])
-  (:import [backtype.storm.testing TestWordSpout TestGlobalCount])
-  (:use [backtype.storm bootstrap testing util]))
-
-(bootstrap)
-
-(deftest test-integration
-  (with-simulated-time-local-cluster [cluster :supervisors 4 :supervisor-slot-port-min 6710
-                                      :daemon-conf {STORM-LOCAL-MODE-ZMQ true 
-                                                    STORM-MESSAGING-TRANSPORT  "backtype.storm.messaging.netty.Context"
-                                                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
-                                                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
-                                                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
-                                                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
-                                                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
-                                                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
-                                                    }]
-    (let [topology (thrift/mk-topology
-                     {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
-                     {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
-                                               :parallelism-hint 6)})
-          results (complete-topology cluster
-                                     topology
-                                     ;; important for test that
-                                     ;; #tuples = multiple of 4 and 6
-                                     :storm-conf {TOPOLOGY-WORKERS 3}
-                                     :mock-sources {"1" [["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ]}
-                                     )]
-      (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
-               (read-tuples results "2"))))))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-netty/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-netty/test/clj/backtype/storm/messaging/netty_unit_test.clj
deleted file mode 100644
index 12ebe5d..0000000
--- a/storm-netty/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ /dev/null
@@ -1,97 +0,0 @@
-(ns backtype.storm.messaging.netty-unit-test
-  (:use [clojure test])
-  (:import [backtype.storm.messaging TransportFactory])
-  (:use [backtype.storm bootstrap testing util]))
-
-(bootstrap)
-
-(def port 6700) 
-(def task 1) 
-
-(deftest test-basic
-  (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
-        storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
-                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
-                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
-                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
-                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
-                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
-                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
-                    }
-        context (TransportFactory/makeContext storm-conf)
-        server (.bind context nil port)
-        client (.connect context nil "localhost" port)
-        _ (.send client task (.getBytes req_msg))
-        resp (.recv server 0)]
-    (is (= task (.task resp)))
-    (is (= req_msg (String. (.message resp))))
-    (.close client)
-    (.close server)
-    (.term context)))    
-
-(deftest test-large-msg
-  (let [req_msg (apply str (repeat 2048000 'c')) 
-        storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
-                    STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
-                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
-                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
-                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
-                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
-                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
-                    }
-        context (TransportFactory/makeContext storm-conf)
-        server (.bind context nil port)
-        client (.connect context nil "localhost" port)
-        _ (.send client task (.getBytes req_msg))
-        resp (.recv server 0)]
-    (is (= task (.task resp)))
-    (is (= req_msg (String. (.message resp))))
-    (.close client)
-    (.close server)
-    (.term context)))    
-    
-(deftest test-server-delayed
-    (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
-       storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
-                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
-                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
-                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
-                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
-                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
-                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
-                    }
-        context (TransportFactory/makeContext storm-conf)
-        client (.connect context nil "localhost" port)
-        _ (.send client task (.getBytes req_msg))
-        _ (Thread/sleep 1000)
-        server (.bind context nil port)
-        resp (.recv server 0)]
-    (is (= task (.task resp)))
-    (is (= req_msg (String. (.message resp))))
-    (.close client)
-    (.close server)
-    (.term context)))    
-
-(deftest test-batch
-  (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
-                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
-                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
-                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
-                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
-                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
-                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
-                    }
-        context (TransportFactory/makeContext storm-conf)
-        server (.bind context nil port)
-        client (.connect context nil "localhost" port)]
-    (doseq [num  (range 1 100000)]
-      (let [req_msg (str num)]
-        (.send client task (.getBytes req_msg))))
-    (doseq [num  (range 1 100000)]
-      (let [req_msg (str num)
-            resp (.recv server 0)
-            resp_msg (String. (.message resp))]
-        (is (= req_msg resp_msg))))
-    (.close client)
-    (.close server)
-    (.term context)))


[2/4] git commit: remove 0MQ and replace with netty

Posted by pt...@apache.org.
remove 0MQ and replace with netty


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/b63ed139
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/b63ed139
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/b63ed139

Branch: refs/heads/master
Commit: b63ed13946eae3acaab4a51bba705124701207eb
Parents: 1bcc169
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Dec 10 11:23:11 2013 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Dec 10 11:23:11 2013 -0500

----------------------------------------------------------------------
 MODULES                                         |   1 -
 bin/install_zmq.sh                              |  31 ---
 conf/defaults.yaml                              |   2 +-
 storm-core/project.clj                          |   2 +-
 .../src/clj/backtype/storm/messaging/zmq.clj    |  93 ---------
 storm-core/src/clj/backtype/storm/testing.clj   |   2 +-
 storm-core/src/clj/zilch/mq.clj                 | 104 ----------
 .../backtype/storm/messaging/netty/Client.java  | 204 ++++++++++++++++++
 .../backtype/storm/messaging/netty/Context.java |  50 +++++
 .../storm/messaging/netty/ControlMessage.java   |  50 +++++
 .../storm/messaging/netty/MessageBatch.java     | 151 ++++++++++++++
 .../storm/messaging/netty/MessageDecoder.java   |  68 ++++++
 .../storm/messaging/netty/MessageEncoder.java   |  22 ++
 .../backtype/storm/messaging/netty/Server.java  | 119 +++++++++++
 .../messaging/netty/StormClientHandler.java     | 104 ++++++++++
 .../netty/StormClientPipelineFactory.java       |  27 +++
 .../messaging/netty/StormServerHandler.java     |  53 +++++
 .../netty/StormServerPipelineFactory.java       |  28 +++
 .../storm/messaging/netty_integration_test.clj  |  44 ++++
 .../storm/messaging/netty_unit_test.clj         |  97 +++++++++
 .../test/clj/backtype/storm/messaging_test.clj  |   2 +-
 storm-core/test/clj/zilch/test/mq.clj           |  86 --------
 storm-netty/project.clj                         |  13 --
 .../backtype/storm/messaging/netty/Client.java  | 205 -------------------
 .../backtype/storm/messaging/netty/Context.java |  50 -----
 .../storm/messaging/netty/ControlMessage.java   |  51 -----
 .../storm/messaging/netty/MessageBatch.java     | 153 --------------
 .../storm/messaging/netty/MessageDecoder.java   |  68 ------
 .../storm/messaging/netty/MessageEncoder.java   |  22 --
 .../backtype/storm/messaging/netty/Server.java  | 120 -----------
 .../messaging/netty/StormClientHandler.java     | 111 ----------
 .../netty/StormClientPipelineFactory.java       |  27 ---
 .../messaging/netty/StormServerHandler.java     |  59 ------
 .../netty/StormServerPipelineFactory.java       |  28 ---
 .../storm/messaging/netty_integration_test.clj  |  44 ----
 .../storm/messaging/netty_unit_test.clj         |  97 ---------
 36 files changed, 1021 insertions(+), 1367 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/MODULES
----------------------------------------------------------------------
diff --git a/MODULES b/MODULES
index 76c078a..aa29093 100644
--- a/MODULES
+++ b/MODULES
@@ -1,4 +1,3 @@
 storm-console-logging
 storm-core
-storm-netty
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/bin/install_zmq.sh
----------------------------------------------------------------------
diff --git a/bin/install_zmq.sh b/bin/install_zmq.sh
deleted file mode 100755
index dc744f1..0000000
--- a/bin/install_zmq.sh
+++ /dev/null
@@ -1,31 +0,0 @@
-#!/bin/bash
-export JAVA_HOME=${JAVA_HOME:/usr/libexec/java_home}
-
-if [ ! -d "$JAVA_HOME/include" ]; then
-    echo "
-Looks like you're missing your 'include' directory. If you're using Mac OS X, You'll need to install the Java dev package.
-
-- Navigate to http://goo.gl/D8lI
-- Click the Java tab on the right
-- Install the appropriate version and try again.
-"
-    exit -1;
-fi
-
-#install zeromq
-wget http://download.zeromq.org/zeromq-2.1.7.tar.gz
-tar -xzf zeromq-2.1.7.tar.gz
-cd zeromq-2.1.7
-./configure
-make
-sudo make install
-
-cd ../
-
-#install jzmq (both native and into local maven cache)
-git clone https://github.com/nathanmarz/jzmq.git
-cd jzmq
-./autogen.sh
-./configure
-make
-sudo make install

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index a5b31f4..35e7b00 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -18,7 +18,7 @@ storm.zookeeper.retry.intervalceiling.millis: 30000
 storm.cluster.mode: "distributed" # can be distributed or local
 storm.local.mode.zmq: false
 storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"
-storm.messaging.transport: "backtype.storm.messaging.zmq"
+storm.messaging.transport: "backtype.storm.messaging.netty.Context"
 
 ### nimbus.* configs are for the master
 nimbus.host: "localhost"

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/project.clj
----------------------------------------------------------------------
diff --git a/storm-core/project.clj b/storm-core/project.clj
index 0eaa6a3..1b7226a 100644
--- a/storm-core/project.clj
+++ b/storm-core/project.clj
@@ -10,7 +10,6 @@
                  [clj-time "0.4.1"]
                  [com.netflix.curator/curator-framework "1.0.1"
                   :exclusions [log4j/log4j]]
-                 [backtype/jzmq "2.1.0"]
                  [com.googlecode.json-simple/json-simple "1.1"]
                  [compojure "1.1.3"]
                  [hiccup "0.3.6"]
@@ -27,6 +26,7 @@
                  [com.google.guava/guava "13.0"]
                  [ch.qos.logback/logback-classic "1.0.6"]
                  [org.slf4j/log4j-over-slf4j "1.6.6"]
+                 [io.netty/netty "3.6.3.Final"]
                  ]
 
   :source-paths ["src/clj"]

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/clj/backtype/storm/messaging/zmq.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/zmq.clj b/storm-core/src/clj/backtype/storm/messaging/zmq.clj
deleted file mode 100644
index 23e263e..0000000
--- a/storm-core/src/clj/backtype/storm/messaging/zmq.clj
+++ /dev/null
@@ -1,93 +0,0 @@
-(ns backtype.storm.messaging.zmq
-  (:refer-clojure :exclude [send])
-  (:use [backtype.storm config log])
-  (:import [backtype.storm.messaging IContext IConnection TaskMessage])
-  (:import [java.nio ByteBuffer])
-  (:import [org.zeromq ZMQ])
-  (:import [java.util Map])
-  (:require [zilch.mq :as mq])
-  (:gen-class
-    :methods [^{:static true} [makeContext [java.util.Map] backtype.storm.messaging.IContext]]))
-
-(defn mk-packet [task ^bytes message]
-  (let [bb (ByteBuffer/allocate (+ 2 (count message)))]
-    (.putShort bb (short task))
-    (.put bb message)
-    (.array bb)
-    ))
-
-(defn parse-packet [^bytes packet]
-  (let [bb (ByteBuffer/wrap packet)
-        port (.getShort bb)
-        msg (byte-array (- (count packet) 2))]
-    (.get bb msg)
-    (TaskMessage. (int port) msg)
-    ))
-
-(defn get-bind-zmq-url [local? port]
-  (if local?
-    (str "ipc://" port ".ipc")
-    (str "tcp://*:" port)))
-
-(defn get-connect-zmq-url [local? host port]
-  (if local?
-    (str "ipc://" port ".ipc")
-    (str "tcp://" host ":" port)))
-
-
-(defprotocol ZMQContextQuery
-  (zmq-context [this]))
-
-(deftype ZMQConnection [socket]
-  IConnection
-  (^TaskMessage recv [this ^int flags]
-    (require 'backtype.storm.messaging.zmq)
-    (if-let [packet (mq/recv socket flags)]
-      (parse-packet packet)))
-  (^void send [this ^int taskId ^bytes payload]
-    (require 'backtype.storm.messaging.zmq)
-    (mq/send socket (mk-packet taskId payload) ZMQ/NOBLOCK)) ;; TODO: how to do backpressure if doing noblock?... need to only unblock if the target disappears
-  (^void close [this]
-    (.close socket)))
-
-(defn mk-connection [socket]
-  (ZMQConnection. socket))
-
-(deftype ZMQContext [^{:unsynchronized-mutable true} context 
-                     ^{:unsynchronized-mutable true} linger-ms 
-                     ^{:unsynchronized-mutable true} hwm 
-                     ^{:unsynchronized-mutable true} local?]
-  IContext
-  (^void prepare [this ^Map storm-conf]
-    (let [num-threads (.get storm-conf ZMQ-THREADS)]
-      (set! context (mq/context num-threads)) 
-      (set! linger-ms (.get storm-conf ZMQ-LINGER-MILLIS))
-      (set! hwm (.get storm-conf ZMQ-HWM))
-      (set! local? (= (.get storm-conf STORM-CLUSTER-MODE) "local"))))
-  (^IConnection bind [this ^String storm-id ^int port]
-    (require 'backtype.storm.messaging.zmq)
-    (-> context
-      (mq/socket mq/pull)
-      (mq/set-hwm hwm)
-      (mq/bind (get-bind-zmq-url local? port))
-      mk-connection
-      ))
-  (^IConnection connect [this ^String storm-id ^String host ^int port]
-    (require 'backtype.storm.messaging.zmq)
-    (-> context
-      (mq/socket mq/push)
-      (mq/set-hwm hwm)
-      (mq/set-linger linger-ms)
-      (mq/connect (get-connect-zmq-url local? host port))
-      mk-connection))
-  (^void term [this]
-    (.term context))
-  
-  ZMQContextQuery
-  (zmq-context [this]
-    context))
-
-(defn -makeContext [^Map storm-conf] 
-  (let [context (ZMQContext. nil 0 0 true)]
-    (.prepare context storm-conf)
-    context))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
index 700dce6..a17743a 100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@ -96,7 +96,7 @@
 ;; local dir is always overridden in maps
 ;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
 ;; if need to customize amt of ports more, can use add-supervisor calls afterwards
-(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1]
+(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024]
   (let [zk-tmp (local-temp-path)
         [zk-port zk-handle] (zk/mk-inprocess-zookeeper zk-tmp)
         daemon-conf (merge (read-storm-config)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/clj/zilch/mq.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/zilch/mq.clj b/storm-core/src/clj/zilch/mq.clj
deleted file mode 100644
index 27c2094..0000000
--- a/storm-core/src/clj/zilch/mq.clj
+++ /dev/null
@@ -1,104 +0,0 @@
-;; Copyright 2011 Tim Dysinger
-
-;;    Licensed under the Apache License, Version 2.0 (the "License");
-;;    you may not use this file except in compliance with the License.
-;;    You may obtain a copy of the License at
-
-;;        http://www.apache.org/licenses/LICENSE-2.0
-
-;;    Unless required by applicable law or agreed to in writing, software
-;;    distributed under the License is distributed on an "AS IS" BASIS,
-;;    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;;    See the License for the specific language governing permissions and
-;;    limitations under the License.
-
-(ns zilch.mq
-  (:refer-clojure :exclude [send])
-  )
-
-(defmacro zeromq-imports []
-  '(do
-    (import '[org.zeromq ZMQ ZMQ$Context ZMQ$Socket])
-    ))
-
-(zeromq-imports)
-
-(defn ^ZMQ$Context context [threads]
-  (ZMQ/context threads))
-
-(defmacro with-context
-  [id threads & body]
-  `(let [~id (context ~threads)]
-     (try ~@body
-          (finally (.term ~id)))))
-
-(def sndmore ZMQ/SNDMORE)
-
-(def req ZMQ/REQ)
-(def rep ZMQ/REP)
-(def xreq ZMQ/XREQ)
-(def xrep ZMQ/XREP)
-(def pub ZMQ/PUB)
-(def sub ZMQ/SUB)
-(def pair ZMQ/PAIR)
-(def push ZMQ/PUSH)
-(def pull ZMQ/PULL)
-
-(defn ^bytes barr [& arr]
-  (byte-array (map byte arr)))
-
-(defn ^ZMQ$Socket socket
-  [^ZMQ$Context context type]
-  (.socket context type))
-
-(defn set-linger
-  [^ZMQ$Socket socket linger-ms]
-  (doto socket
-    (.setLinger (long linger-ms))))
-
-(defn set-hwm
-  [^ZMQ$Socket socket hwm]
-  (if hwm
-    (doto socket
-      (.setHWM (long hwm)))
-    socket
-    ))
-
-(defn bind
-  [^ZMQ$Socket socket url]
-  (doto socket
-    (.bind url)))
-
-(defn connect
-  [^ZMQ$Socket socket url]
-  (doto socket
-    (.connect url)))
-
-(defn subscribe
-  ([^ZMQ$Socket socket ^bytes topic]
-     (doto socket
-       (.subscribe topic)))
-  ([^ZMQ$Socket socket]
-     (subscribe socket (byte-array []))))
-
-(defn unsubscribe
-  ([^ZMQ$Socket socket ^bytes topic]
-     (doto socket
-       (.unsubscribe (.getBytes topic))))
-  ([^ZMQ$Socket socket]
-     (unsubscribe socket "")))
-
-(defn send
-  ([^ZMQ$Socket socket ^bytes message flags]
-     (.send socket message flags))
-  ([^ZMQ$Socket socket ^bytes message]
-     (send socket message ZMQ/NOBLOCK)))
-
-(defn recv-more? [^ZMQ$Socket socket]
-  (.hasReceiveMore socket))
-
-(defn recv
-  ([^ZMQ$Socket socket flags]
-     (.recv socket flags))
-  ([^ZMQ$Socket socket]
-     (recv socket 0)))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
new file mode 100644
index 0000000..c2b391a
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -0,0 +1,204 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.Config;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.utils.Utils;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+class Client implements IConnection {
+    private static final Logger LOG = LoggerFactory.getLogger(Client.class);
+    private final int max_retries;
+    private final int base_sleep_ms;
+    private final int max_sleep_ms;
+    private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
+    private AtomicReference<Channel> channelRef;
+    private final ClientBootstrap bootstrap;
+    private InetSocketAddress remote_addr;
+    private AtomicInteger retries;
+    private final Random random = new Random();
+    private final ChannelFactory factory;
+    private final int buffer_size;
+    private final AtomicBoolean being_closed;
+
+    @SuppressWarnings("rawtypes")
+    Client(Map storm_conf, String host, int port) {
+        message_queue = new LinkedBlockingQueue<Object>();
+        retries = new AtomicInteger(0);
+        channelRef = new AtomicReference<Channel>(null);
+        being_closed = new AtomicBoolean(false);
+
+        // Configure
+        buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+        max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
+        base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
+        max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
+        int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
+
+        if (maxWorkers > 0) {
+            factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
+        } else {
+            factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+        }
+        bootstrap = new ClientBootstrap(factory);
+        bootstrap.setOption("tcpNoDelay", true);
+        bootstrap.setOption("sendBufferSize", buffer_size);
+        bootstrap.setOption("keepAlive", true);
+
+        // Set up the pipeline factory.
+        bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
+
+        // Start the connection attempt.
+        remote_addr = new InetSocketAddress(host, port);
+        bootstrap.connect(remote_addr);
+    }
+
+    /**
+     * We will retry connection with exponential back-off policy
+     */
+    void reconnect() {
+        try {
+            int tried_count = retries.incrementAndGet();
+            if (tried_count <= max_retries) {
+                Thread.sleep(getSleepTimeMs());
+                LOG.info("Reconnect ... [{}]", tried_count);
+                bootstrap.connect(remote_addr);
+                LOG.debug("connection started...");
+            } else {
+                LOG.warn("Remote address is not reachable. We will close this client.");
+                close();
+            }
+        } catch (InterruptedException e) {
+            LOG.warn("connection failed", e);
+        }
+    }
+
+    /**
+     * # of milliseconds to wait per exponential back-off policy
+     */
+    private int getSleepTimeMs()
+    {
+        int backoff = 1 << retries.get();
+        int sleepMs = base_sleep_ms * Math.max(1, random.nextInt(backoff));
+        if ( sleepMs > max_sleep_ms )
+            sleepMs = max_sleep_ms;
+        return sleepMs;
+    }
+
+    /**
+     * Enqueue a task message to be sent to server
+     */
+    public void send(int task, byte[] message) {
+        //throw exception if the client is being closed
+        if (being_closed.get()) {
+            throw new RuntimeException("Client is being closed, and does not take requests any more");
+        }
+
+        try {
+            message_queue.put(new TaskMessage(task, message));
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Take all enqueued messages from queue
+     * @return
+     * @throws InterruptedException
+     */
+    MessageBatch takeMessages()  throws InterruptedException {
+        //1st message
+        MessageBatch batch = new MessageBatch(buffer_size);
+        Object msg = message_queue.take();
+        batch.add(msg);
+
+        //we will discard any message after CLOSE
+        if (msg==ControlMessage.CLOSE_MESSAGE)
+            return batch;
+
+        while (!batch.isFull()) {
+            //peek the next message
+            msg = message_queue.peek();
+            //no more messages
+            if (msg == null) break;
+
+            //we will discard any message after CLOSE
+            if (msg==ControlMessage.CLOSE_MESSAGE) {
+                message_queue.take();
+                batch.add(msg);
+                break;
+            }
+
+            //try to add this msg into batch
+            if (!batch.tryAdd((TaskMessage) msg))
+                break;
+
+            //remove this message
+            message_queue.take();
+        }
+
+        return batch;
+    }
+
+    /**
+     * gracefully close this client.
+     *
+     * We will send all existing requests, and then invoke close_n_release() method
+     */
+    public synchronized void close() {
+        if (!being_closed.get()) {
+            //enqueue a CLOSE message so that shutdown() will be invoked
+            try {
+                message_queue.put(ControlMessage.CLOSE_MESSAGE);
+                being_closed.set(true);
+            } catch (InterruptedException e) {
+                close_n_release();
+            }
+        }
+    }
+
+    /**
+     * close_n_release() is invoked after all messages have been sent.
+     */
+    void  close_n_release() {
+        if (channelRef.get() != null)
+            channelRef.get().close().awaitUninterruptibly();
+
+        //we need to release resources
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                factory.releaseExternalResources();
+            }}).start();
+    }
+
+    public TaskMessage recv(int flags) {
+        throw new RuntimeException("Client connection should not receive any messages");
+    }
+
+    void setChannel(Channel channel) {
+        channelRef.set(channel);
+        //reset retries
+        if (channel != null)
+            retries.set(0);
+    }
+
+}
+
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
new file mode 100644
index 0000000..018e0f9
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -0,0 +1,50 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.IContext;
+
+import java.util.Map;
+import java.util.Vector;
+
+public class Context implements IContext {
+    @SuppressWarnings("rawtypes")
+    private Map storm_conf;
+    private volatile Vector<IConnection> connections;
+    
+    /**
+     * initialization per Storm configuration 
+     */
+    @SuppressWarnings("rawtypes")
+    public void prepare(Map storm_conf) {
+       this.storm_conf = storm_conf;
+       connections = new Vector<IConnection>(); 
+    }
+
+    /**
+     * establish a server with a binding port
+     */
+    public IConnection bind(String storm_id, int port) {
+        IConnection server = new Server(storm_conf, port);
+        connections.add(server);
+        return server;
+    }
+
+    /**
+     * establish a connection to a remote server
+     */
+    public IConnection connect(String storm_id, String host, int port) {        
+        IConnection client =  new Client(storm_conf, host, port);
+        connections.add(client);
+        return client;
+    }
+
+    /**
+     * terminate this context
+     */
+    public void term() {
+        for (IConnection conn : connections) {
+            conn.close();
+        }
+        connections = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
new file mode 100644
index 0000000..4cc2040
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
@@ -0,0 +1,50 @@
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+enum ControlMessage {
+    CLOSE_MESSAGE((short)-100),
+    EOB_MESSAGE((short)-201),
+    OK_RESPONSE((short)-200),
+    FAILURE_RESPONSE((short)-400);
+
+    private short code;
+
+    //private constructor
+    private ControlMessage(short code) {
+        this.code = code;
+    }
+
+    /**
+     * Return a control message per an encoded status code
+     * @param encoded
+     * @return
+     */
+    static ControlMessage mkMessage(short encoded) {
+        for(ControlMessage cm: ControlMessage.values()) {
+          if(encoded == cm.code) return cm;
+        }
+        return null;
+    }
+
+    int encodeLength() {
+        return 2; //short
+    }
+    
+    /**
+     * encode the current Control Message into a channel buffer
+     * @throws Exception
+     */
+    ChannelBuffer buffer() throws Exception {
+        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));      
+        write(bout);
+        bout.close();
+        return bout.buffer();
+    }
+
+    void write(ChannelBufferOutputStream bout) throws Exception {
+        bout.writeShort(code);        
+    } 
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
new file mode 100644
index 0000000..a9d46a2
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@ -0,0 +1,151 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.util.ArrayList;
+
+class MessageBatch {
+    private int buffer_size;
+    private ArrayList<Object> msgs;
+    private int encoded_length;
+
+    MessageBatch(int buffer_size) {
+        this.buffer_size = buffer_size;
+        msgs = new ArrayList<Object>();
+        encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
+    }
+
+    void add(Object obj) {
+        if (obj == null)
+            throw new RuntimeException("null object forbidded in message batch");
+
+        if (obj instanceof TaskMessage) {
+            TaskMessage msg = (TaskMessage)obj;
+            msgs.add(msg);
+            encoded_length += msgEncodeLength(msg);
+            return;
+        }
+
+        if (obj instanceof ControlMessage) {
+            ControlMessage msg = (ControlMessage)obj;
+            msgs.add(msg);
+            encoded_length += msg.encodeLength();
+            return;
+        }
+
+        throw new RuntimeException("Unsuppoted object type "+obj.getClass().getName());
+    }
+
+    void remove(Object obj) {
+        if (obj == null) return;
+
+        if (obj instanceof TaskMessage) {
+            TaskMessage msg = (TaskMessage)obj;
+            msgs.remove(msg);
+            encoded_length -= msgEncodeLength(msg);
+            return;
+        }
+
+        if (obj instanceof ControlMessage) {
+            ControlMessage msg = (ControlMessage)obj;
+            msgs.remove(msg);
+            encoded_length -= msg.encodeLength();
+            return;
+        }
+    }
+
+    Object get(int index) {
+        return msgs.get(index);
+    }
+
+    /**
+     * try to add a TaskMessage to a batch
+     * @param taskMsg
+     * @return false if the msg could not be added due to buffer size limit; true otherwise
+     */
+    boolean tryAdd(TaskMessage taskMsg) {
+        if ((encoded_length + msgEncodeLength(taskMsg)) > buffer_size) 
+            return false;
+        add(taskMsg);
+        return true;
+    }
+
+    private int msgEncodeLength(TaskMessage taskMsg) {
+        if (taskMsg == null) return 0;
+
+        int size = 6; //INT + SHORT
+        if (taskMsg.message() != null) 
+            size += taskMsg.message().length;
+        return size;
+    }
+
+    /**
+     * Has this batch used up allowed buffer size
+     * @return
+     */
+    boolean isFull() {
+        return encoded_length >= buffer_size;
+    }
+
+    /**
+     * true if this batch doesn't have any messages 
+     * @return
+     */
+    boolean isEmpty() {
+        return msgs.isEmpty();
+    }
+
+    /**
+     * # of msgs in this batch
+     * @return
+     */
+    int size() {
+        return msgs.size();
+    }
+
+    /**
+     * create a buffer containing the encoding of this batch
+     */
+    ChannelBuffer buffer() throws Exception {
+        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
+        
+        for (Object msg : msgs) 
+            if (msg instanceof TaskMessage)
+                writeTaskMessage(bout, (TaskMessage)msg);
+            else
+                ((ControlMessage)msg).write(bout);
+        
+        //add a END_OF_BATCH indicator
+        ControlMessage.EOB_MESSAGE.write(bout);
+
+        bout.close();
+
+        return bout.buffer();
+    }
+
+    /**
+     * write a TaskMessage into a stream
+     *
+     * Each TaskMessage is encoded as:
+     *  task ... short(2)
+     *  len ... int(4)
+     *  payload ... byte[]     *  
+     */
+    private void writeTaskMessage(ChannelBufferOutputStream bout, TaskMessage message) throws Exception {
+        int payload_len = 0;
+        if (message.message() != null)
+            payload_len =  message.message().length;
+
+        int task_id = message.task();
+        if (task_id > Short.MAX_VALUE)
+            throw new RuntimeException("Task ID should not exceed "+Short.MAX_VALUE);
+        
+        bout.writeShort((short)task_id);
+        bout.writeInt(payload_len);
+        if (payload_len >0)
+            bout.write(message.message());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
new file mode 100644
index 0000000..76776a9
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
@@ -0,0 +1,68 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+public class MessageDecoder extends FrameDecoder {    
+    /*
+     * Each ControlMessage is encoded as:
+     *  code (<0) ... short(2)
+     * Each TaskMessage is encoded as:
+     *  task (>=0) ... short(2)
+     *  len ... int(4)
+     *  payload ... byte[]     *  
+     */
+    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
+        // Make sure that we have received at least a short 
+        if (buf.readableBytes() < 2) {
+            //need more data
+            return null;
+        }
+
+        // Mark the current buffer position before reading task/len field
+        // because the whole frame might not be in the buffer yet.
+        // We will reset the buffer position to the marked position if
+        // there's not enough bytes in the buffer.
+        buf.markReaderIndex();
+
+        //read the short field
+        short code = buf.readShort();
+        
+        //case 1: Control message
+        ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
+        if (ctrl_msg != null) return ctrl_msg;
+        
+        //case 2: task Message
+        short task = code;
+        
+        // Make sure that we have received at least an integer (length) 
+        if (buf.readableBytes() < 4) {
+            //need more data
+            buf.resetReaderIndex();
+            return null;
+        }
+
+        // Read the length field.
+        int length = buf.readInt();
+        if (length<=0) {
+            return new TaskMessage(task, null);
+        }
+        
+        // Make sure if there's enough bytes in the buffer.
+        if (buf.readableBytes() < length) {
+            // The whole bytes were not received yet - return null.
+            buf.resetReaderIndex();
+            return null;
+        }
+
+        // There's enough bytes in the buffer. Read it.
+        ChannelBuffer payload = buf.readBytes(length);
+
+        // Successfully decoded a frame.
+        // Return a TaskMessage object
+        return new TaskMessage(task,payload.array());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
new file mode 100644
index 0000000..c0ac8f1
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
@@ -0,0 +1,22 @@
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+public class MessageEncoder extends OneToOneEncoder {    
+    @Override
+    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
+        if (obj instanceof ControlMessage) {
+            return ((ControlMessage)obj).buffer();
+        }
+
+        if (obj instanceof MessageBatch) {
+            return ((MessageBatch)obj).buffer();
+        } 
+        
+        throw new RuntimeException("Unsupported encoding of object of class "+obj.getClass().getName());
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
new file mode 100644
index 0000000..bf6825c
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -0,0 +1,119 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.Config;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.utils.Utils;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+class Server implements IConnection {
+    private static final Logger LOG = LoggerFactory.getLogger(Server.class);
+    @SuppressWarnings("rawtypes")
+    Map storm_conf;
+    int port;
+    private LinkedBlockingQueue<TaskMessage> message_queue;
+    volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
+    final ChannelFactory factory;
+    final ServerBootstrap bootstrap;
+
+    @SuppressWarnings("rawtypes")
+    Server(Map storm_conf, int port) {
+        this.storm_conf = storm_conf;
+        this.port = port;
+        message_queue = new LinkedBlockingQueue<TaskMessage>();
+
+        // Configure the server.
+        int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+        int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS));
+
+        if (maxWorkers > 0) {
+            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
+        } else {
+            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+        }
+        bootstrap = new ServerBootstrap(factory);
+        bootstrap.setOption("child.tcpNoDelay", true);
+        bootstrap.setOption("child.receiveBufferSize", buffer_size);
+        bootstrap.setOption("child.keepAlive", true);
+
+        // Set up the pipeline factory.
+        bootstrap.setPipelineFactory(new StormServerPipelineFactory(this));
+
+        // Bind and start to accept incoming connections.
+        Channel channel = bootstrap.bind(new InetSocketAddress(port));
+        allChannels.add(channel);
+    }
+
+    /**
+     * enqueue a received message 
+     * @param message
+     * @throws InterruptedException
+     */
+    protected void enqueue(TaskMessage message) throws InterruptedException {
+        message_queue.put(message);
+        LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length);
+    }
+    
+    /**
+     * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
+     */
+    public TaskMessage recv(int flags)  {
+        if ((flags & 0x01) == 0x01) { 
+            //non-blocking
+            return message_queue.poll();
+        } else {
+            try {
+                TaskMessage request = message_queue.take();
+                LOG.debug("request to be processed: {}", request);
+                return request;
+            } catch (InterruptedException e) {
+                LOG.info("exception within msg receiving", e);
+                return null;
+            }
+        }
+    }
+
+    /**
+     * register a newly created channel
+     * @param channel
+     */
+    protected void addChannel(Channel channel) {
+        allChannels.add(channel);
+    }
+    
+    /**
+     * close a channel
+     * @param channel
+     */
+    protected void closeChannel(Channel channel) {
+        channel.close().awaitUninterruptibly();
+        allChannels.remove(channel);
+    }
+
+    /**
+     * close all channels, and release resources
+     */
+    public synchronized void close() {
+        if (allChannels != null) {  
+            allChannels.close().awaitUninterruptibly();
+            factory.releaseExternalResources();
+            allChannels = null;
+        }
+    }
+
+    public void send(int task, byte[] message) {
+        throw new RuntimeException("Server connection should not send any messages");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
new file mode 100644
index 0000000..6fbfb1c
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
@@ -0,0 +1,104 @@
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class StormClientHandler extends SimpleChannelUpstreamHandler  {
+    private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
+    private Client client;
+    private AtomicBoolean being_closed;
+    long start_time; 
+    
+    StormClientHandler(Client client) {
+        this.client = client;
+        being_closed = new AtomicBoolean(false);
+        start_time = System.currentTimeMillis();
+    }
+
+    @Override
+    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) {
+        //register the newly established channel
+        Channel channel = event.getChannel();
+        client.setChannel(channel);
+        LOG.debug("connection established to a remote host");
+        
+        //send next request
+        try {
+            sendRequests(channel, client.takeMessages());
+        } catch (InterruptedException e) {
+            channel.close();
+        }
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
+        LOG.debug("send/recv time (ms): {}", (System.currentTimeMillis() - start_time));
+        
+        //examine the response message from server
+        ControlMessage msg = (ControlMessage)event.getMessage();
+        if (msg==ControlMessage.FAILURE_RESPONSE)
+            LOG.info("failure response:{}", msg);
+
+        //send next request
+        Channel channel = event.getChannel();
+        try {
+            sendRequests(channel, client.takeMessages());
+        } catch (InterruptedException e) {
+            channel.close();
+        }
+    }
+
+    /**
+     * Retrieve a request from message queue, and send to server
+     * @param channel
+     */
+    private void sendRequests(Channel channel, final MessageBatch requests) {
+        if (requests==null || requests.size()==0 || being_closed.get()) return;
+
+        //if task==CLOSE_MESSAGE for our last request, the channel is to be closed
+        Object last_msg = requests.get(requests.size()-1);
+        if (last_msg==ControlMessage.CLOSE_MESSAGE) {
+            being_closed.set(true);
+            requests.remove(last_msg);
+        }
+
+        //we may don't need do anything if no requests found
+        if (requests.isEmpty()) {
+            if (being_closed.get())
+                client.close_n_release();
+            return;
+        }
+
+        //write request into socket channel
+        ChannelFuture future = channel.write(requests);
+        future.addListener(new ChannelFutureListener() {
+            public void operationComplete(ChannelFuture future)
+                    throws Exception {
+                if (!future.isSuccess()) {
+                    LOG.info("failed to send requests:", future.getCause());
+                    future.getChannel().close();
+                } else {
+                    LOG.debug("{} request(s) sent", requests.size());
+                }
+                if (being_closed.get())
+                    client.close_n_release();
+            }
+        });
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
+        Throwable cause = event.getCause();
+        if (!(cause instanceof ConnectException)) {
+            LOG.info("Connection failed:", cause);
+        } 
+        if (!being_closed.get()) {
+            client.setChannel(null);
+            client.reconnect();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
new file mode 100644
index 0000000..91c513a
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@ -0,0 +1,27 @@
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+
+class StormClientPipelineFactory implements ChannelPipelineFactory {
+    private Client client;
+
+    StormClientPipelineFactory(Client client) {
+        this.client = client;        
+    }
+
+    public ChannelPipeline getPipeline() throws Exception {
+        // Create a default pipeline implementation.
+        ChannelPipeline pipeline = Channels.pipeline();
+
+        // Decoder
+        pipeline.addLast("decoder", new MessageDecoder());
+        // Encoder
+        pipeline.addLast("encoder", new MessageEncoder());
+        // business logic.
+        pipeline.addLast("handler", new StormClientHandler(client));
+
+        return pipeline;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
new file mode 100644
index 0000000..9a5aaed
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
@@ -0,0 +1,53 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+import org.jboss.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+class StormServerHandler extends SimpleChannelUpstreamHandler  {
+    private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
+    Server server;
+    private AtomicInteger failure_count; 
+    
+    StormServerHandler(Server server) {
+        this.server = server;
+        failure_count = new AtomicInteger(0);
+    }
+    
+    @Override
+    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+        server.addChannel(e.getChannel());
+    }
+    
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+        Object msg = e.getMessage();  
+        if (msg == null) return;
+
+        //end of batch?
+        if (msg==ControlMessage.EOB_MESSAGE) {
+            Channel channel = ctx.getChannel();
+            LOG.debug("Send back response ...");
+            if (failure_count.get()==0)
+                channel.write(ControlMessage.OK_RESPONSE);
+            else channel.write(ControlMessage.FAILURE_RESPONSE);
+            return;
+        }
+        
+        //enqueue the received message for processing
+        try {
+            server.enqueue((TaskMessage)msg);
+        } catch (InterruptedException e1) {
+            LOG.info("failed to enqueue a request message", e);
+            failure_count.incrementAndGet();
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+        server.closeChannel(e.getChannel());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
new file mode 100644
index 0000000..56b0834
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
@@ -0,0 +1,28 @@
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+
+
+class StormServerPipelineFactory implements  ChannelPipelineFactory {
+    private Server server;
+    
+    StormServerPipelineFactory(Server server) {
+        this.server = server;        
+    }
+    
+    public ChannelPipeline getPipeline() throws Exception {
+        // Create a default pipeline implementation.
+        ChannelPipeline pipeline = Channels.pipeline();
+
+        // Decoder
+        pipeline.addLast("decoder", new MessageDecoder());
+        // Encoder
+        pipeline.addLast("encoder", new MessageEncoder());
+        // business logic.
+        pipeline.addLast("handler", new StormServerHandler(server));
+
+        return pipeline;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
new file mode 100644
index 0000000..eefcb48
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
@@ -0,0 +1,44 @@
+(ns backtype.storm.messaging.netty-integration-test
+  (:use [clojure test])
+  (:import [backtype.storm.messaging TransportFactory])
+  (:import [backtype.storm.testing TestWordSpout TestGlobalCount])
+  (:use [backtype.storm bootstrap testing util]))
+
+(bootstrap)
+
+(deftest test-integration
+  (with-simulated-time-local-cluster [cluster :supervisors 4 :supervisor-slot-port-min 6710
+                                      :daemon-conf {STORM-LOCAL-MODE-ZMQ true 
+                                                    STORM-MESSAGING-TRANSPORT  "backtype.storm.messaging.netty.Context"
+                                                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
+                                                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
+                                                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
+                                                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+                                                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+                                                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+                                                    }]
+    (let [topology (thrift/mk-topology
+                     {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
+                     {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
+                                               :parallelism-hint 6)})
+          results (complete-topology cluster
+                                     topology
+                                     ;; important for test that
+                                     ;; #tuples = multiple of 4 and 6
+                                     :storm-conf {TOPOLOGY-WORKERS 3}
+                                     :mock-sources {"1" [["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ]}
+                                     )]
+      (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
+               (read-tuples results "2"))))))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
new file mode 100644
index 0000000..12ebe5d
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -0,0 +1,97 @@
+(ns backtype.storm.messaging.netty-unit-test
+  (:use [clojure test])
+  (:import [backtype.storm.messaging TransportFactory])
+  (:use [backtype.storm bootstrap testing util]))
+
+(bootstrap)
+
+(def port 6700) 
+(def task 1) 
+
+(deftest test-basic
+  (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
+        storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
+                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
+                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
+                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+                    }
+        context (TransportFactory/makeContext storm-conf)
+        server (.bind context nil port)
+        client (.connect context nil "localhost" port)
+        _ (.send client task (.getBytes req_msg))
+        resp (.recv server 0)]
+    (is (= task (.task resp)))
+    (is (= req_msg (String. (.message resp))))
+    (.close client)
+    (.close server)
+    (.term context)))    
+
+(deftest test-large-msg
+  (let [req_msg (apply str (repeat 2048000 'c')) 
+        storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+                    STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
+                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
+                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
+                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+                    }
+        context (TransportFactory/makeContext storm-conf)
+        server (.bind context nil port)
+        client (.connect context nil "localhost" port)
+        _ (.send client task (.getBytes req_msg))
+        resp (.recv server 0)]
+    (is (= task (.task resp)))
+    (is (= req_msg (String. (.message resp))))
+    (.close client)
+    (.close server)
+    (.term context)))    
+    
+(deftest test-server-delayed
+    (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
+       storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
+                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
+                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
+                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+                    }
+        context (TransportFactory/makeContext storm-conf)
+        client (.connect context nil "localhost" port)
+        _ (.send client task (.getBytes req_msg))
+        _ (Thread/sleep 1000)
+        server (.bind context nil port)
+        resp (.recv server 0)]
+    (is (= task (.task resp)))
+    (is (= req_msg (String. (.message resp))))
+    (.close client)
+    (.close server)
+    (.term context)))    
+
+(deftest test-batch
+  (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
+                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
+                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
+                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+                    }
+        context (TransportFactory/makeContext storm-conf)
+        server (.bind context nil port)
+        client (.connect context nil "localhost" port)]
+    (doseq [num  (range 1 100000)]
+      (let [req_msg (str num)]
+        (.send client task (.getBytes req_msg))))
+    (doseq [num  (range 1 100000)]
+      (let [req_msg (str num)
+            resp (.recv server 0)
+            resp_msg (String. (.message resp))]
+        (is (= req_msg resp_msg))))
+    (.close client)
+    (.close server)
+    (.term context)))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/test/clj/backtype/storm/messaging_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging_test.clj b/storm-core/test/clj/backtype/storm/messaging_test.clj
index 6b44ea1..3c61cec 100644
--- a/storm-core/test/clj/backtype/storm/messaging_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging_test.clj
@@ -14,7 +14,7 @@
                                                       STORM-LOCAL-MODE-ZMQ 
                                                       (if transport-on? true false) 
                                                       STORM-MESSAGING-TRANSPORT 
-                                                      "backtype.storm.messaging.zmq"}]
+                                                      "backtype.storm.messaging.netty.Context"}]
       (let [topology (thrift/mk-topology
                        {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 2)}
                        {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/test/clj/zilch/test/mq.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/zilch/test/mq.clj b/storm-core/test/clj/zilch/test/mq.clj
deleted file mode 100644
index 756d29b..0000000
--- a/storm-core/test/clj/zilch/test/mq.clj
+++ /dev/null
@@ -1,86 +0,0 @@
-(ns zilch.test.mq
-  (:use clojure.test)
-  (:import [java.util Arrays UUID])
-  (:require [zilch.mq :as mq]))
-
-(defn uuid [] (str (UUID/randomUUID)))
-
-(defn random-msg []
-  (byte-array (map byte (for [i (range (Integer. (int (rand 100))))]
-    (Integer. (int (rand 100)))
-    ))))
-
-(def url
-     (str "inproc://" (uuid))
-     ;; (str "ipc://" (uuid))
-     ;; (str "tcp://127.0.0.1:" (+ 4000 (Math/round (rand 1000)))))
-     )
-
-(deftest zilch
-  (testing "zilch"
-    (testing "should be able to"
-
-      (testing "push / pull"
-        (mq/with-context context 2
-          (with-open [s0 (-> context
-                             (mq/socket mq/pull)
-                             (mq/bind url))
-                      s1 (-> context
-                             (mq/socket mq/push)
-                             (mq/connect url))]
-            (let [msg (random-msg)
-                  push (future (mq/send s1 msg))
-                  pull (future (mq/recv s0))]
-              (is (Arrays/equals msg @pull))))))
-
-      (testing "pub / sub"
-        (mq/with-context context 2
-          (with-open [s0 (-> context
-                             (mq/socket mq/pub)
-                             (mq/bind url))
-                      s1 (-> context
-                             (mq/socket mq/sub)
-                             (mq/subscribe)
-                             (mq/connect url))]
-            (let [msg (random-msg)
-                  pub (future (mq/send s0 msg))
-                  sub (future (mq/recv s1))]
-              (is (Arrays/equals msg @sub))))))
-
-      (testing "pair / pair"
-        (mq/with-context context 2
-          (with-open [s0 (-> context
-                             (mq/socket mq/pair)
-                             (mq/bind url))
-                      s1 (-> context
-                             (mq/socket mq/pair)
-                             (mq/connect url))]
-            (let [msg0 (random-msg)
-                  pair0 (future (mq/send s0 msg0)
-                                (mq/recv s0))
-                  msg1 (random-msg)
-                  pair1 (future (mq/send s1 msg1)
-                                (mq/recv s1))]
-              (is (Arrays/equals msg1 @pair0))
-              (is (Arrays/equals msg0 @pair1))))))
-
-      (testing "req / rep"
-        (mq/with-context context 2
-          (with-open [s0 (-> context
-                             (mq/socket mq/rep)
-                             (mq/bind url))
-                      s1 (-> context
-                             (mq/socket mq/req)
-                             (mq/connect url))]
-            (let [msg (random-msg)
-                  req (future (mq/send s1 msg)
-                              (mq/recv s1))
-                  rep (future (mq/recv s0)
-                              (mq/send s0 msg))]
-              (is (Arrays/equals msg @req))))))
-
-      (testing "req / xrep")
-
-      (testing "xreq / rep")
-
-      (testing "xreq / xrep"))))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/project.clj
----------------------------------------------------------------------
diff --git a/storm-netty/project.clj b/storm-netty/project.clj
deleted file mode 100644
index 24905bf..0000000
--- a/storm-netty/project.clj
+++ /dev/null
@@ -1,13 +0,0 @@
-(def ROOT-DIR (subs *file* 0 (- (count *file*) (count "project.clj"))))
-(def VERSION (-> ROOT-DIR (str "/../VERSION") slurp (.trim)))
-
-(eval `(defproject storm/storm-netty ~VERSION
-  :dependencies [[storm/storm-core ~VERSION]
-                 [io.netty/netty "3.6.3.Final"]]
-  :java-source-paths ["src/jvm"]
-  :test-paths ["test/clj"]
-  :profiles {:release {}}
-  :jvm-opts ["-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib"]
-  :target-path "target"
-  :javac-options ["-target" "1.6" "-source" "1.6"]
-  :aot :all))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
deleted file mode 100644
index 91e4bd4..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
+++ /dev/null
@@ -1,205 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.TaskMessage;
-import backtype.storm.utils.Utils;
-
-class Client implements IConnection {
-    private static final Logger LOG = LoggerFactory.getLogger(Client.class);
-    private final int max_retries;
-    private final int base_sleep_ms;
-    private final int max_sleep_ms;
-    private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
-    private AtomicReference<Channel> channelRef;
-    private final ClientBootstrap bootstrap;
-    private InetSocketAddress remote_addr;
-    private AtomicInteger retries;
-    private final Random random = new Random();
-    private final ChannelFactory factory;
-    private final int buffer_size;
-    private final AtomicBoolean being_closed;
-
-    @SuppressWarnings("rawtypes")
-    Client(Map storm_conf, String host, int port) {
-        message_queue = new LinkedBlockingQueue<Object>();
-        retries = new AtomicInteger(0);
-        channelRef = new AtomicReference<Channel>(null);
-        being_closed = new AtomicBoolean(false);
-
-        // Configure
-        buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
-        max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
-        base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
-        max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
-        int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
-
-        if (maxWorkers > 0) {
-            factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
-        } else {
-            factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
-        }
-        bootstrap = new ClientBootstrap(factory);
-        bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setOption("sendBufferSize", buffer_size);
-        bootstrap.setOption("keepAlive", true);
-
-        // Set up the pipeline factory.
-        bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
-
-        // Start the connection attempt.
-        remote_addr = new InetSocketAddress(host, port);
-        bootstrap.connect(remote_addr);
-    }
-
-    /**
-     * We will retry connection with exponential back-off policy
-     */
-    void reconnect() {
-        try {
-            int tried_count = retries.incrementAndGet();
-            if (tried_count <= max_retries) {
-                Thread.sleep(getSleepTimeMs());
-                LOG.info("Reconnect ... [{}]", tried_count);
-                bootstrap.connect(remote_addr);
-                LOG.debug("connection started...");
-            } else {
-                LOG.warn("Remote address is not reachable. We will close this client.");
-                close();
-            }
-        } catch (InterruptedException e) {
-            LOG.warn("connection failed", e);
-        }
-    }
-
-    /**
-     * # of milliseconds to wait per exponential back-off policy
-     */
-    private int getSleepTimeMs()
-    {
-        int backoff = 1 << retries.get();
-        int sleepMs = base_sleep_ms * Math.max(1, random.nextInt(backoff));
-        if ( sleepMs > max_sleep_ms )
-            sleepMs = max_sleep_ms;
-        return sleepMs;
-    }
-
-    /**
-     * Enqueue a task message to be sent to server
-     */
-    public void send(int task, byte[] message) {
-        //throw exception if the client is being closed
-        if (being_closed.get()) {
-            throw new RuntimeException("Client is being closed, and does not take requests any more");
-        }
-
-        try {
-            message_queue.put(new TaskMessage(task, message));
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Take all enqueued messages from queue
-     * @return
-     * @throws InterruptedException
-     */
-    MessageBatch takeMessages()  throws InterruptedException {
-        //1st message
-        MessageBatch batch = new MessageBatch(buffer_size);
-        Object msg = message_queue.take();
-        batch.add(msg);
-
-        //we will discard any message after CLOSE
-        if (msg==ControlMessage.CLOSE_MESSAGE)
-            return batch;
-
-        while (!batch.isFull()) {
-            //peek the next message
-            msg = message_queue.peek();
-            //no more messages
-            if (msg == null) break;
-
-            //we will discard any message after CLOSE
-            if (msg==ControlMessage.CLOSE_MESSAGE) {
-                message_queue.take();
-                batch.add(msg);
-                break;
-            }
-
-            //try to add this msg into batch
-            if (!batch.tryAdd((TaskMessage) msg))
-                break;
-
-            //remove this message
-            message_queue.take();
-        }
-
-        return batch;
-    }
-
-    /**
-     * gracefully close this client.
-     *
-     * We will send all existing requests, and then invoke close_n_release() method
-     */
-    public synchronized void close() {
-        if (!being_closed.get()) {
-            //enqueue a CLOSE message so that shutdown() will be invoked
-            try {
-                message_queue.put(ControlMessage.CLOSE_MESSAGE);
-                being_closed.set(true);
-            } catch (InterruptedException e) {
-                close_n_release();
-            }
-        }
-    }
-
-    /**
-     * close_n_release() is invoked after all messages have been sent.
-     */
-    void  close_n_release() {
-        if (channelRef.get() != null)
-            channelRef.get().close().awaitUninterruptibly();
-
-        //we need to release resources
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                factory.releaseExternalResources();
-            }}).start();
-    }
-
-    public TaskMessage recv(int flags) {
-        throw new RuntimeException("Client connection should not receive any messages");
-    }
-
-    void setChannel(Channel channel) {
-        channelRef.set(channel);
-        //reset retries
-        if (channel != null)
-            retries.set(0);
-    }
-
-}
-
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/Context.java
deleted file mode 100644
index bebd7b6..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/Context.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import java.util.Map;
-import java.util.Vector;
-
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.IContext;
-
-public class Context implements IContext {
-    @SuppressWarnings("rawtypes")
-    private Map storm_conf;
-    private volatile Vector<IConnection> connections;
-    
-    /**
-     * initialization per Storm configuration 
-     */
-    @SuppressWarnings("rawtypes")
-    public void prepare(Map storm_conf) {
-       this.storm_conf = storm_conf;
-       connections = new Vector<IConnection>(); 
-    }
-
-    /**
-     * establish a server with a binding port
-     */
-    public IConnection bind(String storm_id, int port) {
-        IConnection server = new Server(storm_conf, port);
-        connections.add(server);
-        return server;
-    }
-
-    /**
-     * establish a connection to a remote server
-     */
-    public IConnection connect(String storm_id, String host, int port) {        
-        IConnection client =  new Client(storm_conf, host, port);
-        connections.add(client);
-        return client;
-    }
-
-    /**
-     * terminate this context
-     */
-    public void term() {
-        for (IConnection conn : connections) {
-            conn.close();
-        }
-        connections = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/ControlMessage.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
deleted file mode 100644
index 8b90005..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-
-enum ControlMessage {
-    CLOSE_MESSAGE((short)-100),
-    EOB_MESSAGE((short)-201),
-    OK_RESPONSE((short)-200),
-    FAILURE_RESPONSE((short)-400);
-
-    private short code;
-
-    //private constructor
-    private ControlMessage(short code) {
-        this.code = code;
-    }
-
-    /**
-     * Return a control message per an encoded status code
-     * @param encoded
-     * @return
-     */
-    static ControlMessage mkMessage(short encoded) {
-        for(ControlMessage cm: ControlMessage.values()) {
-          if(encoded == cm.code) return cm;
-        }
-        return null;
-    }
-
-    int encodeLength() {
-        return 2; //short
-    }
-    
-    /**
-     * encode the current Control Message into a channel buffer
-     * @throws Exception
-     */
-    ChannelBuffer buffer() throws Exception {
-        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));      
-        write(bout);
-        bout.close();
-        return bout.buffer();
-    }
-
-    void write(ChannelBufferOutputStream bout) throws Exception {
-        bout.writeShort(code);        
-    } 
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
deleted file mode 100644
index a2c52f4..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ /dev/null
@@ -1,153 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import java.util.ArrayList;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-
-import backtype.storm.messaging.TaskMessage;
-
-class MessageBatch {
-    private int buffer_size;
-    private ArrayList<Object> msgs;
-    private int encoded_length;
-
-    MessageBatch(int buffer_size) {
-        this.buffer_size = buffer_size;
-        msgs = new ArrayList<Object>();
-        encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
-    }
-
-    void add(Object obj) {
-        if (obj == null)
-            throw new RuntimeException("null object forbidded in message batch");
-
-        if (obj instanceof TaskMessage) {
-            TaskMessage msg = (TaskMessage)obj;
-            msgs.add(msg);
-            encoded_length += msgEncodeLength(msg);
-            return;
-        }
-
-        if (obj instanceof ControlMessage) {
-            ControlMessage msg = (ControlMessage)obj;
-            msgs.add(msg);
-            encoded_length += msg.encodeLength();
-            return;
-        }
-
-        throw new RuntimeException("Unsuppoted object type "+obj.getClass().getName());
-    }
-
-    void remove(Object obj) {
-        if (obj == null) return;
-
-        if (obj instanceof TaskMessage) {
-            TaskMessage msg = (TaskMessage)obj;
-            msgs.remove(msg);
-            encoded_length -= msgEncodeLength(msg);
-            return;
-        }
-
-        if (obj instanceof ControlMessage) {
-            ControlMessage msg = (ControlMessage)obj;
-            msgs.remove(msg);
-            encoded_length -= msg.encodeLength();
-            return;
-        }
-    }
-
-    Object get(int index) {
-        return msgs.get(index);
-    }
-
-    /**
-     * try to add a TaskMessage to a batch
-     * @param taskMsg
-     * @return false if the msg could not be added due to buffer size limit; true otherwise
-     */
-    boolean tryAdd(TaskMessage taskMsg) {
-        if ((encoded_length + msgEncodeLength(taskMsg)) > buffer_size) 
-            return false;
-        add(taskMsg);
-        return true;
-    }
-
-    private int msgEncodeLength(TaskMessage taskMsg) {
-        if (taskMsg == null) return 0;
-
-        int size = 6; //INT + SHORT
-        if (taskMsg.message() != null) 
-            size += taskMsg.message().length;
-        return size;
-    }
-
-    /**
-     * Has this batch used up allowed buffer size
-     * @return
-     */
-    boolean isFull() {
-        return encoded_length >= buffer_size;
-    }
-
-    /**
-     * true if this batch doesn't have any messages 
-     * @return
-     */
-    boolean isEmpty() {
-        return msgs.isEmpty();
-    }
-
-    /**
-     * # of msgs in this batch
-     * @return
-     */
-    int size() {
-        return msgs.size();
-    }
-
-    /**
-     * create a buffer containing the encoding of this batch
-     */
-    ChannelBuffer buffer() throws Exception {
-        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
-        
-        for (Object msg : msgs) 
-            if (msg instanceof TaskMessage)
-                writeTaskMessage(bout, (TaskMessage)msg);
-            else
-                ((ControlMessage)msg).write(bout);
-        
-        //add a END_OF_BATCH indicator
-        ControlMessage.EOB_MESSAGE.write(bout);
-
-        bout.close();
-
-        return bout.buffer();
-    }
-
-    /**
-     * write a TaskMessage into a stream
-     *
-     * Each TaskMessage is encoded as:
-     *  task ... short(2)
-     *  len ... int(4)
-     *  payload ... byte[]     *  
-     */
-    private void writeTaskMessage(ChannelBufferOutputStream bout, TaskMessage message) throws Exception {
-        int payload_len = 0;
-        if (message.message() != null)
-            payload_len =  message.message().length;
-
-        int task_id = message.task();
-        if (task_id > Short.MAX_VALUE)
-            throw new RuntimeException("Task ID should not exceed "+Short.MAX_VALUE);
-        
-        bout.writeShort((short)task_id);
-        bout.writeInt(payload_len);
-        if (payload_len >0)
-            bout.write(message.message());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
deleted file mode 100644
index 8190e44..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.frame.FrameDecoder;
-import backtype.storm.messaging.TaskMessage;
-
-public class MessageDecoder extends FrameDecoder {    
-    /*
-     * Each ControlMessage is encoded as:
-     *  code (<0) ... short(2)
-     * Each TaskMessage is encoded as:
-     *  task (>=0) ... short(2)
-     *  len ... int(4)
-     *  payload ... byte[]     *  
-     */
-    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
-        // Make sure that we have received at least a short 
-        if (buf.readableBytes() < 2) {
-            //need more data
-            return null;
-        }
-
-        // Mark the current buffer position before reading task/len field
-        // because the whole frame might not be in the buffer yet.
-        // We will reset the buffer position to the marked position if
-        // there's not enough bytes in the buffer.
-        buf.markReaderIndex();
-
-        //read the short field
-        short code = buf.readShort();
-        
-        //case 1: Control message
-        ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
-        if (ctrl_msg != null) return ctrl_msg;
-        
-        //case 2: task Message
-        short task = code;
-        
-        // Make sure that we have received at least an integer (length) 
-        if (buf.readableBytes() < 4) {
-            //need more data
-            buf.resetReaderIndex();
-            return null;
-        }
-
-        // Read the length field.
-        int length = buf.readInt();
-        if (length<=0) {
-            return new TaskMessage(task, null);
-        }
-        
-        // Make sure if there's enough bytes in the buffer.
-        if (buf.readableBytes() < length) {
-            // The whole bytes were not received yet - return null.
-            buf.resetReaderIndex();
-            return null;
-        }
-
-        // There's enough bytes in the buffer. Read it.
-        ChannelBuffer payload = buf.readBytes(length);
-
-        // Successfully decoded a frame.
-        // Return a TaskMessage object
-        return new TaskMessage(task,payload.array());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
deleted file mode 100644
index c0ac8f1..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
-
-public class MessageEncoder extends OneToOneEncoder {    
-    @Override
-    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
-        if (obj instanceof ControlMessage) {
-            return ((ControlMessage)obj).buffer();
-        }
-
-        if (obj instanceof MessageBatch) {
-            return ((MessageBatch)obj).buffer();
-        } 
-        
-        throw new RuntimeException("Unsupported encoding of object of class "+obj.getClass().getName());
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/Server.java
deleted file mode 100644
index 4119bbf..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/Server.java
+++ /dev/null
@@ -1,120 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Executors;
-
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.TaskMessage;
-import backtype.storm.utils.Utils;
-
-class Server implements IConnection {
-    private static final Logger LOG = LoggerFactory.getLogger(Server.class);
-    @SuppressWarnings("rawtypes")
-    Map storm_conf;
-    int port;
-    private LinkedBlockingQueue<TaskMessage> message_queue;
-    volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
-    final ChannelFactory factory;
-    final ServerBootstrap bootstrap;
-
-    @SuppressWarnings("rawtypes")
-    Server(Map storm_conf, int port) {
-        this.storm_conf = storm_conf;
-        this.port = port;
-        message_queue = new LinkedBlockingQueue<TaskMessage>();
-
-        // Configure the server.
-        int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
-        int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS));
-
-        if (maxWorkers > 0) {
-            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
-        } else {
-            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
-        }
-        bootstrap = new ServerBootstrap(factory);
-        bootstrap.setOption("child.tcpNoDelay", true);
-        bootstrap.setOption("child.receiveBufferSize", buffer_size);
-        bootstrap.setOption("child.keepAlive", true);
-
-        // Set up the pipeline factory.
-        bootstrap.setPipelineFactory(new StormServerPipelineFactory(this));
-
-        // Bind and start to accept incoming connections.
-        Channel channel = bootstrap.bind(new InetSocketAddress(port));
-        allChannels.add(channel);
-    }
-
-    /**
-     * enqueue a received message 
-     * @param message
-     * @throws InterruptedException
-     */
-    protected void enqueue(TaskMessage message) throws InterruptedException {
-        message_queue.put(message);
-        LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length);
-    }
-    
-    /**
-     * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
-     */
-    public TaskMessage recv(int flags)  {
-        if ((flags & 0x01) == 0x01) { 
-            //non-blocking
-            return message_queue.poll();
-        } else {
-            try {
-                TaskMessage request = message_queue.take();
-                LOG.debug("request to be processed: {}", request);
-                return request;
-            } catch (InterruptedException e) {
-                LOG.info("exception within msg receiving", e);
-                return null;
-            }
-        }
-    }
-
-    /**
-     * register a newly created channel
-     * @param channel
-     */
-    protected void addChannel(Channel channel) {
-        allChannels.add(channel);
-    }
-    
-    /**
-     * close a channel
-     * @param channel
-     */
-    protected void closeChannel(Channel channel) {
-        channel.close().awaitUninterruptibly();
-        allChannels.remove(channel);
-    }
-
-    /**
-     * close all channels, and release resources
-     */
-    public synchronized void close() {
-        if (allChannels != null) {  
-            allChannels.close().awaitUninterruptibly();
-            factory.releaseExternalResources();
-            allChannels = null;
-        }
-    }
-
-    public void send(int task, byte[] message) {
-        throw new RuntimeException("Server connection should not send any messages");
-    }
-}


[3/4] git commit: Merge branch 'master' into netty-default

Posted by pt...@apache.org.
Merge branch 'master' into netty-default

Conflicts:
	bin/install_zmq.sh
	storm-core/src/clj/backtype/storm/messaging/zmq.clj
	storm-core/src/clj/zilch/mq.clj
	storm-core/test/clj/zilch/test/mq.clj
	storm-netty/project.clj


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/962d5207
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/962d5207
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/962d5207

Branch: refs/heads/master
Commit: 962d520777368709cf30d6214719bfa1b81bbd4c
Parents: b63ed13 7e40f9f
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Dec 16 13:21:19 2013 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Mon Dec 16 13:21:19 2013 -0500

----------------------------------------------------------------------
 LICENSE                                         | 285 +++++++++++++++++++
 LICENSE.html                                    | 261 -----------------
 NOTICE                                          |  12 +-
 README.markdown                                 |  23 +-
 VERSION                                         |   2 +-
 bin/build_modules.sh                            |  16 ++
 bin/build_release.sh                            |  17 ++
 bin/javadoc.sh                                  |  16 ++
 bin/storm                                       |  16 ++
 bin/to_maven.sh                                 |  17 ++
 conf/defaults.yaml                              |  18 ++
 conf/jaas_digest.conf                           |  18 ++
 conf/logback.xml                                |  17 +-
 conf/storm.yaml.example                         |  16 ++
 logback/cluster.xml                             |  18 +-
 project.clj                                     |  15 +
 storm-console-logging/project.clj               |  15 +
 storm-core/project.clj                          |  21 +-
 .../src/clj/backtype/storm/LocalCluster.clj     |  15 +
 storm-core/src/clj/backtype/storm/LocalDRPC.clj |  15 +
 storm-core/src/clj/backtype/storm/bootstrap.clj |  15 +
 storm-core/src/clj/backtype/storm/clojure.clj   |  15 +
 storm-core/src/clj/backtype/storm/cluster.clj   |  15 +
 .../src/clj/backtype/storm/command/activate.clj |  15 +
 .../clj/backtype/storm/command/config_value.clj |  15 +
 .../clj/backtype/storm/command/deactivate.clj   |  15 +
 .../backtype/storm/command/dev_zookeeper.clj    |  15 +
 .../backtype/storm/command/kill_topology.clj    |  15 +
 .../src/clj/backtype/storm/command/list.clj     |  15 +
 .../clj/backtype/storm/command/rebalance.clj    |  15 +
 .../backtype/storm/command/shell_submission.clj |  15 +
 storm-core/src/clj/backtype/storm/config.clj    |  15 +
 .../src/clj/backtype/storm/daemon/acker.clj     |  15 +
 .../backtype/storm/daemon/builtin_metrics.clj   |  15 +
 .../src/clj/backtype/storm/daemon/common.clj    |  15 +
 .../src/clj/backtype/storm/daemon/drpc.clj      |  19 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |  15 +
 .../src/clj/backtype/storm/daemon/logviewer.clj |  15 +
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  15 +
 .../clj/backtype/storm/daemon/supervisor.clj    |  15 +
 .../src/clj/backtype/storm/daemon/task.clj      |  15 +
 .../src/clj/backtype/storm/daemon/worker.clj    |  15 +
 storm-core/src/clj/backtype/storm/disruptor.clj |  15 +
 storm-core/src/clj/backtype/storm/event.clj     |  15 +
 storm-core/src/clj/backtype/storm/log.clj       |  15 +
 .../src/clj/backtype/storm/messaging/loader.clj |  15 +
 .../src/clj/backtype/storm/messaging/local.clj  |  15 +
 .../src/clj/backtype/storm/metric/testing.clj   |  15 +
 .../clj/backtype/storm/process_simulator.clj    |  15 +
 .../storm/scheduler/DefaultScheduler.clj        |  15 +
 .../backtype/storm/scheduler/EvenScheduler.clj  |  15 +
 .../storm/scheduler/IsolationScheduler.clj      |  15 +
 storm-core/src/clj/backtype/storm/stats.clj     |  15 +
 storm-core/src/clj/backtype/storm/testing.clj   |  15 +
 storm-core/src/clj/backtype/storm/testing4j.clj |  15 +
 storm-core/src/clj/backtype/storm/thrift.clj    |  15 +
 storm-core/src/clj/backtype/storm/timer.clj     |  15 +
 storm-core/src/clj/backtype/storm/tuple.clj     |  15 +
 storm-core/src/clj/backtype/storm/ui/core.clj   |  15 +
 .../src/clj/backtype/storm/ui/helpers.clj       |  15 +
 storm-core/src/clj/backtype/storm/util.clj      |  15 +
 storm-core/src/clj/backtype/storm/zookeeper.clj |  15 +
 storm-core/src/clj/storm/trident/testing.clj    |  15 +
 storm-core/src/dev/resources/tester_bolt.py     |  18 ++
 storm-core/src/dev/resources/tester_bolt.rb     |  18 ++
 storm-core/src/dev/resources/tester_spout.py    |  18 ++
 storm-core/src/dev/resources/tester_spout.rb    |  17 ++
 storm-core/src/genthrift.sh                     |  16 ++
 storm-core/src/jvm/backtype/storm/Config.java   |  17 ++
 .../jvm/backtype/storm/ConfigValidation.java    |  17 ++
 .../src/jvm/backtype/storm/Constants.java       |  17 ++
 .../src/jvm/backtype/storm/ILocalCluster.java   |  17 ++
 .../src/jvm/backtype/storm/ILocalDRPC.java      |  17 ++
 .../src/jvm/backtype/storm/StormSubmitter.java  |  17 ++
 .../jvm/backtype/storm/clojure/ClojureBolt.java |  17 ++
 .../backtype/storm/clojure/ClojureSpout.java    |  17 ++
 .../backtype/storm/clojure/RichShellBolt.java   |  17 ++
 .../backtype/storm/clojure/RichShellSpout.java  |  17 ++
 .../storm/coordination/BatchBoltExecutor.java   |  17 ++
 .../coordination/BatchOutputCollector.java      |  17 ++
 .../coordination/BatchOutputCollectorImpl.java  |  17 ++
 .../coordination/BatchSubtopologyBuilder.java   |  17 ++
 .../storm/coordination/CoordinatedBolt.java     |  17 ++
 .../backtype/storm/coordination/IBatchBolt.java |  17 ++
 .../jvm/backtype/storm/daemon/Shutdownable.java |  17 ++
 .../storm/drpc/DRPCInvocationsClient.java       |  17 ++
 .../src/jvm/backtype/storm/drpc/DRPCSpout.java  |  17 ++
 .../src/jvm/backtype/storm/drpc/JoinResult.java |  17 ++
 .../jvm/backtype/storm/drpc/KeyedFairBolt.java  |  17 ++
 .../storm/drpc/LinearDRPCInputDeclarer.java     |  17 ++
 .../storm/drpc/LinearDRPCTopologyBuilder.java   |  17 ++
 .../jvm/backtype/storm/drpc/PrepareRequest.java |  17 ++
 .../jvm/backtype/storm/drpc/ReturnResults.java  |  17 ++
 .../storm/generated/AlreadyAliveException.java  |  17 ++
 .../src/jvm/backtype/storm/generated/Bolt.java  |  17 ++
 .../jvm/backtype/storm/generated/BoltStats.java |  17 ++
 .../storm/generated/ClusterSummary.java         |  17 ++
 .../storm/generated/ComponentCommon.java        |  17 ++
 .../storm/generated/ComponentObject.java        |  17 ++
 .../storm/generated/DRPCExecutionException.java |  17 ++
 .../backtype/storm/generated/DRPCRequest.java   |  17 ++
 .../storm/generated/DistributedRPC.java         |  17 ++
 .../generated/DistributedRPCInvocations.java    |  17 ++
 .../jvm/backtype/storm/generated/ErrorInfo.java |  17 ++
 .../backtype/storm/generated/ExecutorInfo.java  |  17 ++
 .../storm/generated/ExecutorSpecificStats.java  |  17 ++
 .../backtype/storm/generated/ExecutorStats.java |  17 ++
 .../storm/generated/ExecutorSummary.java        |  17 ++
 .../storm/generated/GlobalStreamId.java         |  17 ++
 .../jvm/backtype/storm/generated/Grouping.java  |  17 ++
 .../generated/InvalidTopologyException.java     |  17 ++
 .../backtype/storm/generated/JavaObject.java    |  17 ++
 .../backtype/storm/generated/JavaObjectArg.java |  17 ++
 .../backtype/storm/generated/KillOptions.java   |  17 ++
 .../jvm/backtype/storm/generated/Nimbus.java    |  17 ++
 .../storm/generated/NotAliveException.java      |  17 ++
 .../backtype/storm/generated/NullStruct.java    |  17 ++
 .../storm/generated/RebalanceOptions.java       |  17 ++
 .../storm/generated/ShellComponent.java         |  17 ++
 .../jvm/backtype/storm/generated/SpoutSpec.java |  17 ++
 .../backtype/storm/generated/SpoutStats.java    |  17 ++
 .../storm/generated/StateSpoutSpec.java         |  17 ++
 .../backtype/storm/generated/StormTopology.java |  17 ++
 .../backtype/storm/generated/StreamInfo.java    |  17 ++
 .../backtype/storm/generated/SubmitOptions.java |  17 ++
 .../storm/generated/SupervisorSummary.java      |  17 ++
 .../backtype/storm/generated/TopologyInfo.java  |  17 ++
 .../storm/generated/TopologyInitialStatus.java  |  17 ++
 .../storm/generated/TopologySummary.java        |  17 ++
 .../storm/grouping/CustomStreamGrouping.java    |  17 ++
 .../jvm/backtype/storm/hooks/BaseTaskHook.java  |  17 ++
 .../src/jvm/backtype/storm/hooks/ITaskHook.java |  17 ++
 .../backtype/storm/hooks/info/BoltAckInfo.java  |  17 ++
 .../storm/hooks/info/BoltExecuteInfo.java       |  17 ++
 .../backtype/storm/hooks/info/BoltFailInfo.java |  17 ++
 .../jvm/backtype/storm/hooks/info/EmitInfo.java |  17 ++
 .../backtype/storm/hooks/info/SpoutAckInfo.java |  17 ++
 .../storm/hooks/info/SpoutFailInfo.java         |  17 ++
 .../backtype/storm/messaging/IConnection.java   |  17 ++
 .../jvm/backtype/storm/messaging/IContext.java  |  17 ++
 .../backtype/storm/messaging/TaskMessage.java   |  17 ++
 .../storm/messaging/TransportFactory.java       |  17 ++
 .../backtype/storm/messaging/netty/Client.java  |  17 ++
 .../backtype/storm/messaging/netty/Context.java |  17 ++
 .../storm/messaging/netty/ControlMessage.java   |  17 ++
 .../storm/messaging/netty/MessageBatch.java     |  17 ++
 .../storm/messaging/netty/MessageDecoder.java   |  17 ++
 .../storm/messaging/netty/MessageEncoder.java   |  17 ++
 .../backtype/storm/messaging/netty/Server.java  |  17 ++
 .../messaging/netty/StormClientHandler.java     |  17 ++
 .../netty/StormClientPipelineFactory.java       |  17 ++
 .../messaging/netty/StormServerHandler.java     |  17 ++
 .../netty/StormServerPipelineFactory.java       |  17 ++
 .../storm/metric/LoggingMetricsConsumer.java    |  17 ++
 .../storm/metric/MetricsConsumerBolt.java       |  17 ++
 .../jvm/backtype/storm/metric/SystemBolt.java   |  17 ++
 .../storm/metric/api/AssignableMetric.java      |  17 ++
 .../storm/metric/api/CombinedMetric.java        |  17 ++
 .../backtype/storm/metric/api/CountMetric.java  |  17 ++
 .../backtype/storm/metric/api/ICombiner.java    |  17 ++
 .../jvm/backtype/storm/metric/api/IMetric.java  |  17 ++
 .../storm/metric/api/IMetricsConsumer.java      |  17 ++
 .../jvm/backtype/storm/metric/api/IReducer.java |  17 ++
 .../storm/metric/api/IStatefulObject.java       |  17 ++
 .../backtype/storm/metric/api/MeanReducer.java  |  17 ++
 .../storm/metric/api/MultiCountMetric.java      |  17 ++
 .../storm/metric/api/MultiReducedMetric.java    |  17 ++
 .../storm/metric/api/ReducedMetric.java         |  17 ++
 .../backtype/storm/metric/api/StateMetric.java  |  17 ++
 .../storm/nimbus/DefaultTopologyValidator.java  |  17 ++
 .../storm/nimbus/ITopologyValidator.java        |  17 ++
 .../backtype/storm/planner/CompoundSpout.java   |  17 ++
 .../backtype/storm/planner/CompoundTask.java    |  17 ++
 .../jvm/backtype/storm/planner/TaskBundle.java  |  17 ++
 .../jvm/backtype/storm/scheduler/Cluster.java   |  17 ++
 .../storm/scheduler/ExecutorDetails.java        |  91 +++---
 .../jvm/backtype/storm/scheduler/INimbus.java   |  17 ++
 .../backtype/storm/scheduler/IScheduler.java    |  17 ++
 .../backtype/storm/scheduler/ISupervisor.java   |  17 ++
 .../storm/scheduler/SchedulerAssignment.java    |  97 ++++---
 .../scheduler/SchedulerAssignmentImpl.java      | 201 +++++++------
 .../storm/scheduler/SupervisorDetails.java      |  17 ++
 .../backtype/storm/scheduler/Topologies.java    |  97 ++++---
 .../storm/scheduler/TopologyDetails.java        |  17 ++
 .../backtype/storm/scheduler/WorkerSlot.java    |  17 ++
 .../backtype/storm/security/auth/AuthUtils.java |  17 ++
 .../storm/security/auth/IAuthorizer.java        |  17 ++
 .../storm/security/auth/ITransportPlugin.java   |  17 ++
 .../storm/security/auth/ReqContext.java         |  17 ++
 .../security/auth/SaslTransportPlugin.java      |  17 ++
 .../security/auth/SimpleTransportPlugin.java    |  17 ++
 .../storm/security/auth/ThriftClient.java       |  17 ++
 .../storm/security/auth/ThriftServer.java       |  17 ++
 .../auth/authorizer/DenyAuthorizer.java         |  17 ++
 .../auth/authorizer/NoopAuthorizer.java         |  17 ++
 .../auth/digest/ClientCallbackHandler.java      |  17 ++
 .../auth/digest/DigestSaslTransportPlugin.java  |  17 ++
 .../auth/digest/ServerCallbackHandler.java      |  17 ++
 .../serialization/BlowfishTupleSerializer.java  |  17 ++
 .../storm/serialization/DefaultKryoFactory.java |  17 ++
 .../storm/serialization/IKryoDecorator.java     |  17 ++
 .../storm/serialization/IKryoFactory.java       |  17 ++
 .../storm/serialization/ITupleDeserializer.java |  17 ++
 .../storm/serialization/ITupleSerializer.java   |  17 ++
 .../serialization/KryoTupleDeserializer.java    |  17 ++
 .../serialization/KryoTupleSerializer.java      |  17 ++
 .../serialization/KryoValuesDeserializer.java   |  17 ++
 .../serialization/KryoValuesSerializer.java     |  17 ++
 .../serialization/SerializableSerializer.java   |  17 ++
 .../serialization/SerializationFactory.java     |  17 ++
 .../types/ArrayListSerializer.java              |  17 ++
 .../serialization/types/HashMapSerializer.java  |  17 ++
 .../serialization/types/HashSetSerializer.java  |  17 ++
 .../types/ListDelegateSerializer.java           |  17 ++
 .../storm/spout/IMultiSchemableSpout.java       |  17 ++
 .../backtype/storm/spout/ISchemableSpout.java   |  17 ++
 .../src/jvm/backtype/storm/spout/ISpout.java    |  17 ++
 .../storm/spout/ISpoutOutputCollector.java      |  17 ++
 .../storm/spout/ISpoutWaitStrategy.java         |  17 ++
 .../jvm/backtype/storm/spout/MultiScheme.java   |  17 ++
 .../storm/spout/NothingEmptyEmitStrategy.java   |  17 ++
 .../backtype/storm/spout/RawMultiScheme.java    |  17 ++
 .../src/jvm/backtype/storm/spout/RawScheme.java |  17 ++
 .../src/jvm/backtype/storm/spout/Scheme.java    |  17 ++
 .../storm/spout/SchemeAsMultiScheme.java        |  17 ++
 .../jvm/backtype/storm/spout/ShellSpout.java    |  17 ++
 .../storm/spout/SleepSpoutWaitStrategy.java     |  17 ++
 .../storm/spout/SpoutOutputCollector.java       |  17 ++
 .../jvm/backtype/storm/state/IStateSpout.java   |  17 ++
 .../storm/state/IStateSpoutOutputCollector.java |  17 ++
 .../backtype/storm/state/ISubscribedState.java  |  17 ++
 .../state/ISynchronizeOutputCollector.java      |  17 ++
 .../storm/state/StateSpoutOutputCollector.java  |  17 ++
 .../storm/state/SynchronizeOutputCollector.java |  17 ++
 .../storm/task/GeneralTopologyContext.java      |  17 ++
 .../src/jvm/backtype/storm/task/IBolt.java      |  17 ++
 .../jvm/backtype/storm/task/IErrorReporter.java |  17 ++
 .../backtype/storm/task/IMetricsContext.java    |  17 ++
 .../backtype/storm/task/IOutputCollector.java   |  17 ++
 .../backtype/storm/task/OutputCollector.java    |  17 ++
 .../src/jvm/backtype/storm/task/ShellBolt.java  |  17 ++
 .../backtype/storm/task/TopologyContext.java    |  17 ++
 .../storm/task/WorkerTopologyContext.java       |  17 ++
 .../backtype/storm/testing/AckFailDelegate.java |  17 ++
 .../storm/testing/AckFailMapTracker.java        |  17 ++
 .../jvm/backtype/storm/testing/AckTracker.java  |  17 ++
 .../backtype/storm/testing/BatchNumberList.java |  17 ++
 .../storm/testing/BatchProcessWord.java         |  17 ++
 .../backtype/storm/testing/BatchRepeatA.java    |  17 ++
 .../jvm/backtype/storm/testing/BoltTracker.java |  17 ++
 .../storm/testing/CompleteTopologyParam.java    | 117 ++++----
 .../storm/testing/CountingBatchBolt.java        |  17 ++
 .../storm/testing/CountingCommitBolt.java       |  17 ++
 .../jvm/backtype/storm/testing/FeederSpout.java |  17 ++
 .../jvm/backtype/storm/testing/FixedTuple.java  |  17 ++
 .../backtype/storm/testing/FixedTupleSpout.java |  17 ++
 .../backtype/storm/testing/IdentityBolt.java    |  17 ++
 .../storm/testing/KeyedCountingBatchBolt.java   |  17 ++
 .../testing/KeyedCountingCommitterBolt.java     |  17 ++
 .../storm/testing/KeyedSummingBatchBolt.java    |  17 ++
 .../storm/testing/MemoryTransactionalSpout.java |  17 ++
 .../testing/MemoryTransactionalSpoutMeta.java   |  17 ++
 .../backtype/storm/testing/MkClusterParam.java  |  97 ++++---
 .../backtype/storm/testing/MkTupleParam.java    |  85 +++---
 .../backtype/storm/testing/MockedSources.java   | 103 ++++---
 .../jvm/backtype/storm/testing/NGrouping.java   |  17 ++
 .../storm/testing/NonRichBoltTracker.java       |  17 ++
 .../testing/OpaqueMemoryTransactionalSpout.java |  17 ++
 .../storm/testing/PrepareBatchBolt.java         |  17 ++
 .../backtype/storm/testing/SpoutTracker.java    |  17 ++
 .../storm/testing/TestAggregatesCounter.java    |  17 ++
 .../backtype/storm/testing/TestConfBolt.java    |  17 ++
 .../backtype/storm/testing/TestGlobalCount.java |  17 ++
 .../src/jvm/backtype/storm/testing/TestJob.java |  65 +++--
 .../storm/testing/TestKryoDecorator.java        |  17 ++
 .../backtype/storm/testing/TestPlannerBolt.java |  17 ++
 .../storm/testing/TestPlannerSpout.java         |  17 ++
 .../backtype/storm/testing/TestSerObject.java   |  17 ++
 .../backtype/storm/testing/TestWordCounter.java |  17 ++
 .../backtype/storm/testing/TestWordSpout.java   |  17 ++
 .../backtype/storm/testing/TrackedTopology.java |  51 ++--
 .../storm/testing/TupleCaptureBolt.java         |  17 ++
 .../topology/BaseConfigurationDeclarer.java     |  17 ++
 .../storm/topology/BasicBoltExecutor.java       |  17 ++
 .../storm/topology/BasicOutputCollector.java    |  17 ++
 .../backtype/storm/topology/BoltDeclarer.java   |  17 ++
 .../ComponentConfigurationDeclarer.java         |  17 ++
 .../storm/topology/FailedException.java         |  17 ++
 .../jvm/backtype/storm/topology/IBasicBolt.java |  17 ++
 .../storm/topology/IBasicOutputCollector.java   |  17 ++
 .../jvm/backtype/storm/topology/IComponent.java |  17 ++
 .../jvm/backtype/storm/topology/IRichBolt.java  |  17 ++
 .../jvm/backtype/storm/topology/IRichSpout.java |  17 ++
 .../storm/topology/IRichStateSpout.java         |  17 ++
 .../backtype/storm/topology/InputDeclarer.java  |  17 ++
 .../storm/topology/OutputFieldsDeclarer.java    |  17 ++
 .../storm/topology/OutputFieldsGetter.java      |  17 ++
 .../storm/topology/ReportedFailedException.java |  17 ++
 .../backtype/storm/topology/SpoutDeclarer.java  |  17 ++
 .../storm/topology/TopologyBuilder.java         |  17 ++
 .../storm/topology/base/BaseBasicBolt.java      |  17 ++
 .../storm/topology/base/BaseBatchBolt.java      |  17 ++
 .../storm/topology/base/BaseComponent.java      |  17 ++
 ...BaseOpaquePartitionedTransactionalSpout.java |  17 ++
 .../base/BasePartitionedTransactionalSpout.java |  17 ++
 .../storm/topology/base/BaseRichBolt.java       |  17 ++
 .../storm/topology/base/BaseRichSpout.java      |  17 ++
 .../topology/base/BaseTransactionalBolt.java    |  17 ++
 .../topology/base/BaseTransactionalSpout.java   |  17 ++
 .../storm/transactional/ICommitter.java         |  17 ++
 .../ICommitterTransactionalSpout.java           |  17 ++
 .../transactional/ITransactionalSpout.java      |  17 ++
 .../storm/transactional/TransactionAttempt.java |  17 ++
 .../TransactionalSpoutBatchExecutor.java        |  17 ++
 .../TransactionalSpoutCoordinator.java          |  17 ++
 .../TransactionalTopologyBuilder.java           |  17 ++
 .../IOpaquePartitionedTransactionalSpout.java   |  17 ++
 .../IPartitionedTransactionalSpout.java         |  17 ++
 ...uePartitionedTransactionalSpoutExecutor.java |  17 ++
 .../PartitionedTransactionalSpoutExecutor.java  |  17 ++
 .../state/RotatingTransactionalState.java       |  17 ++
 .../transactional/state/TransactionalState.java |  17 ++
 .../src/jvm/backtype/storm/tuple/Fields.java    |  17 ++
 .../src/jvm/backtype/storm/tuple/MessageId.java |  17 ++
 .../src/jvm/backtype/storm/tuple/Tuple.java     |  17 ++
 .../src/jvm/backtype/storm/tuple/TupleImpl.java |  17 ++
 .../src/jvm/backtype/storm/tuple/Values.java    |  17 ++
 .../storm/utils/BufferFileInputStream.java      |  17 ++
 .../backtype/storm/utils/CRC32OutputStream.java |  17 ++
 .../backtype/storm/utils/ClojureTimerTask.java  |  17 ++
 .../src/jvm/backtype/storm/utils/Container.java |  17 ++
 .../jvm/backtype/storm/utils/DRPCClient.java    |  17 ++
 .../backtype/storm/utils/DisruptorQueue.java    |  17 ++
 .../storm/utils/IndifferentAccessMap.java       |  17 ++
 .../backtype/storm/utils/InprocMessaging.java   |  17 ++
 .../storm/utils/KeyedRoundRobinQueue.java       |  17 ++
 .../jvm/backtype/storm/utils/ListDelegate.java  |  17 ++
 .../jvm/backtype/storm/utils/LocalState.java    |  17 ++
 .../jvm/backtype/storm/utils/MutableInt.java    |  17 ++
 .../jvm/backtype/storm/utils/MutableLong.java   |  17 ++
 .../jvm/backtype/storm/utils/MutableObject.java |  17 ++
 .../jvm/backtype/storm/utils/NimbusClient.java  |  17 ++
 .../storm/utils/RegisteredGlobalState.java      |  17 ++
 .../jvm/backtype/storm/utils/RotatingMap.java   |  17 ++
 .../backtype/storm/utils/ServiceRegistry.java   |  17 ++
 .../jvm/backtype/storm/utils/ShellProcess.java  |  17 ++
 .../storm/utils/ThriftTopologyUtils.java        |  17 ++
 .../src/jvm/backtype/storm/utils/Time.java      |  17 ++
 .../jvm/backtype/storm/utils/TimeCacheMap.java  |  17 ++
 .../src/jvm/backtype/storm/utils/Utils.java     |  17 ++
 .../backtype/storm/utils/VersionedStore.java    |  17 ++
 .../storm/utils/WindowedTimeThrottler.java      |  17 ++
 .../jvm/backtype/storm/utils/WritableUtils.java |  17 ++
 .../backtype/storm/utils/ZookeeperAuthInfo.java |  17 ++
 storm-core/src/jvm/storm/trident/JoinType.java  |  17 ++
 storm-core/src/jvm/storm/trident/Stream.java    |  17 ++
 .../src/jvm/storm/trident/TridentState.java     |  17 ++
 .../src/jvm/storm/trident/TridentTopology.java  |  17 ++
 .../trident/drpc/ReturnResultsReducer.java      |  17 ++
 .../fluent/ChainedAggregatorDeclarer.java       |  17 ++
 .../fluent/ChainedFullAggregatorDeclarer.java   |  17 ++
 .../ChainedPartitionAggregatorDeclarer.java     |  17 ++
 .../trident/fluent/GlobalAggregationScheme.java |  17 ++
 .../jvm/storm/trident/fluent/GroupedStream.java |  17 ++
 .../trident/fluent/IAggregatableStream.java     |  17 ++
 .../fluent/IChainedAggregatorDeclarer.java      |  17 ++
 .../jvm/storm/trident/fluent/UniqueIdGen.java   |  17 ++
 .../jvm/storm/trident/graph/GraphGrouper.java   |  17 ++
 .../src/jvm/storm/trident/graph/Group.java      |  17 ++
 .../jvm/storm/trident/operation/Aggregator.java |  17 ++
 .../jvm/storm/trident/operation/Assembly.java   |  17 ++
 .../storm/trident/operation/BaseAggregator.java |  17 ++
 .../jvm/storm/trident/operation/BaseFilter.java |  17 ++
 .../storm/trident/operation/BaseFunction.java   |  17 ++
 .../trident/operation/BaseMultiReducer.java     |  17 ++
 .../storm/trident/operation/BaseOperation.java  |  17 ++
 .../trident/operation/CombinerAggregator.java   |  17 ++
 .../storm/trident/operation/EachOperation.java  |  17 ++
 .../src/jvm/storm/trident/operation/Filter.java |  17 ++
 .../jvm/storm/trident/operation/Function.java   |  17 ++
 .../trident/operation/GroupedMultiReducer.java  |  17 ++
 .../storm/trident/operation/MultiReducer.java   |  17 ++
 .../jvm/storm/trident/operation/Operation.java  |  17 ++
 .../trident/operation/ReducerAggregator.java    |  17 ++
 .../trident/operation/TridentCollector.java     |  17 ++
 .../operation/TridentMultiReducerContext.java   |  17 ++
 .../operation/TridentOperationContext.java      |  17 ++
 .../storm/trident/operation/builtin/Count.java  |  17 ++
 .../storm/trident/operation/builtin/Debug.java  |  17 ++
 .../storm/trident/operation/builtin/Equals.java |  17 ++
 .../trident/operation/builtin/FilterNull.java   |  17 ++
 .../storm/trident/operation/builtin/FirstN.java |  17 ++
 .../storm/trident/operation/builtin/MapGet.java |  17 ++
 .../storm/trident/operation/builtin/Negate.java |  17 ++
 .../trident/operation/builtin/SnapshotGet.java  |  17 ++
 .../storm/trident/operation/builtin/Sum.java    |  17 ++
 .../operation/builtin/TupleCollectionGet.java   |  17 ++
 .../operation/impl/CaptureCollector.java        |  17 ++
 .../operation/impl/ChainedAggregatorImpl.java   |  17 ++
 .../trident/operation/impl/ChainedResult.java   |  17 ++
 .../operation/impl/CombinerAggStateUpdater.java |  17 ++
 .../impl/CombinerAggregatorCombineImpl.java     |  17 ++
 .../impl/CombinerAggregatorInitImpl.java        |  17 ++
 .../trident/operation/impl/FilterExecutor.java  |  17 ++
 .../operation/impl/GlobalBatchToPartition.java  |  17 ++
 .../trident/operation/impl/GroupCollector.java  |  17 ++
 .../operation/impl/GroupedAggregator.java       |  17 ++
 .../impl/GroupedMultiReducerExecutor.java       |  17 ++
 .../operation/impl/IdentityMultiReducer.java    |  17 ++
 .../impl/IndexHashBatchToPartition.java         |  17 ++
 .../operation/impl/JoinerMultiReducer.java      |  17 ++
 .../operation/impl/ReducerAggStateUpdater.java  |  17 ++
 .../operation/impl/ReducerAggregatorImpl.java   |  17 ++
 .../storm/trident/operation/impl/Result.java    |  17 ++
 .../operation/impl/SingleEmitAggregator.java    |  17 ++
 .../trident/operation/impl/TrueFilter.java      |  17 ++
 .../storm/trident/partition/GlobalGrouping.java |  17 ++
 .../trident/partition/IdentityGrouping.java     |  17 ++
 .../trident/partition/IndexHashGrouping.java    |  17 ++
 .../storm/trident/planner/BridgeReceiver.java   |  17 ++
 .../src/jvm/storm/trident/planner/Node.java     |  17 ++
 .../storm/trident/planner/NodeStateInfo.java    |  17 ++
 .../storm/trident/planner/PartitionNode.java    |  17 ++
 .../storm/trident/planner/ProcessorContext.java |  17 ++
 .../storm/trident/planner/ProcessorNode.java    |  17 ++
 .../jvm/storm/trident/planner/SpoutNode.java    |  17 ++
 .../storm/trident/planner/SubtopologyBolt.java  |  17 ++
 .../storm/trident/planner/TridentProcessor.java |  17 ++
 .../storm/trident/planner/TupleReceiver.java    |  17 ++
 .../planner/processor/AggregateProcessor.java   |  17 ++
 .../planner/processor/AppendCollector.java      |  17 ++
 .../planner/processor/EachProcessor.java        |  17 ++
 .../planner/processor/FreshCollector.java       |  17 ++
 .../processor/MultiReducerProcessor.java        |  17 ++
 .../processor/PartitionPersistProcessor.java    |  17 ++
 .../planner/processor/ProjectedProcessor.java   |  17 ++
 .../planner/processor/StateQueryProcessor.java  |  17 ++
 .../planner/processor/TridentContext.java       |  17 ++
 .../storm/trident/spout/BatchSpoutExecutor.java |  17 ++
 .../src/jvm/storm/trident/spout/IBatchID.java   |  17 ++
 .../jvm/storm/trident/spout/IBatchSpout.java    |  17 ++
 .../trident/spout/ICommitterTridentSpout.java   |  17 ++
 .../spout/IOpaquePartitionedTridentSpout.java   |  17 ++
 .../trident/spout/IPartitionedTridentSpout.java |  17 ++
 .../storm/trident/spout/ISpoutPartition.java    |  17 ++
 .../jvm/storm/trident/spout/ITridentSpout.java  |  17 ++
 .../OpaquePartitionedTridentSpoutExecutor.java  |  17 ++
 .../spout/PartitionedTridentSpoutExecutor.java  |  17 ++
 .../trident/spout/RichSpoutBatchExecutor.java   |  17 ++
 .../storm/trident/spout/RichSpoutBatchId.java   |  17 ++
 .../spout/RichSpoutBatchIdSerializer.java       |  17 ++
 .../trident/spout/RichSpoutBatchTriggerer.java  |  17 ++
 .../trident/spout/TridentSpoutCoordinator.java  |  17 ++
 .../trident/spout/TridentSpoutExecutor.java     |  17 ++
 .../storm/trident/state/BaseQueryFunction.java  |  17 ++
 .../storm/trident/state/BaseStateUpdater.java   |  17 ++
 .../trident/state/CombinerValueUpdater.java     |  17 ++
 .../storm/trident/state/ITupleCollection.java   |  17 ++
 .../state/JSONNonTransactionalSerializer.java   |  17 ++
 .../trident/state/JSONOpaqueSerializer.java     |  17 ++
 .../state/JSONTransactionalSerializer.java      |  17 ++
 .../jvm/storm/trident/state/OpaqueValue.java    |  17 ++
 .../jvm/storm/trident/state/QueryFunction.java  |  17 ++
 .../jvm/storm/trident/state/ReadOnlyState.java  |  17 ++
 .../trident/state/ReducerValueUpdater.java      |  17 ++
 .../src/jvm/storm/trident/state/Serializer.java |  17 ++
 .../src/jvm/storm/trident/state/State.java      |  17 ++
 .../jvm/storm/trident/state/StateFactory.java   |  17 ++
 .../src/jvm/storm/trident/state/StateSpec.java  |  17 ++
 .../src/jvm/storm/trident/state/StateType.java  |  17 ++
 .../jvm/storm/trident/state/StateUpdater.java   |  17 ++
 .../storm/trident/state/TransactionalValue.java |  17 ++
 .../jvm/storm/trident/state/ValueUpdater.java   |  17 ++
 .../trident/state/map/CachedBatchReadsMap.java  |  17 ++
 .../jvm/storm/trident/state/map/CachedMap.java  |  17 ++
 .../storm/trident/state/map/IBackingMap.java    |  17 ++
 .../state/map/MapCombinerAggStateUpdater.java   |  17 ++
 .../state/map/MapReducerAggStateUpdater.java    |  17 ++
 .../jvm/storm/trident/state/map/MapState.java   |  17 ++
 .../state/map/MicroBatchIBackingMap.java        |  17 ++
 .../trident/state/map/NonTransactionalMap.java  |  17 ++
 .../jvm/storm/trident/state/map/OpaqueMap.java  |  17 ++
 .../trident/state/map/ReadOnlyMapState.java     |  17 ++
 .../trident/state/map/SnapshottableMap.java     |  17 ++
 .../trident/state/map/TransactionalMap.java     |  17 ++
 .../state/snapshot/ReadOnlySnapshottable.java   |  17 ++
 .../trident/state/snapshot/Snapshottable.java   |  17 ++
 .../trident/testing/CountAsAggregator.java      |  17 ++
 .../storm/trident/testing/FeederBatchSpout.java |  17 ++
 .../testing/FeederCommitterBatchSpout.java      |  17 ++
 .../storm/trident/testing/FixedBatchSpout.java  |  17 ++
 .../src/jvm/storm/trident/testing/IFeeder.java  |  17 ++
 .../trident/testing/LRUMemoryMapState.java      |  17 ++
 .../storm/trident/testing/MemoryBackingMap.java |  17 ++
 .../storm/trident/testing/MemoryMapState.java   |  17 ++
 .../storm/trident/testing/MockTridentTuple.java |  17 ++
 .../src/jvm/storm/trident/testing/Split.java    |  17 ++
 .../jvm/storm/trident/testing/StringLength.java |  17 ++
 .../jvm/storm/trident/testing/TrueFilter.java   |  17 ++
 .../jvm/storm/trident/testing/TuplifyArgs.java  |  17 ++
 .../jvm/storm/trident/topology/BatchInfo.java   |  17 ++
 .../trident/topology/ITridentBatchBolt.java     |  17 ++
 .../topology/MasterBatchCoordinator.java        |  17 ++
 .../trident/topology/TransactionAttempt.java    |  17 ++
 .../trident/topology/TridentBoltExecutor.java   |  17 ++
 .../topology/TridentTopologyBuilder.java        |  17 ++
 .../state/RotatingTransactionalState.java       |  17 ++
 .../topology/state/TransactionalState.java      |  17 ++
 .../src/jvm/storm/trident/tuple/ComboList.java  |  17 ++
 .../src/jvm/storm/trident/tuple/ConsList.java   |  17 ++
 .../jvm/storm/trident/tuple/TridentTuple.java   |  17 ++
 .../storm/trident/tuple/TridentTupleView.java   |  17 ++
 .../jvm/storm/trident/tuple/ValuePointer.java   |  17 ++
 .../storm/trident/util/ErrorEdgeFactory.java    |  17 ++
 .../src/jvm/storm/trident/util/IndexedEdge.java |  17 ++
 .../src/jvm/storm/trident/util/LRUMap.java      |  17 ++
 .../jvm/storm/trident/util/TridentUtils.java    |  17 ++
 storm-core/src/multilang/py/storm.py            |  18 ++
 storm-core/src/multilang/rb/storm.rb            |  18 ++
 storm-core/src/storm.thrift                     |  23 ++
 storm-core/src/ui/public/css/style.css          |  17 ++
 storm-core/src/ui/public/js/script.js           |  17 ++
 .../test/clj/backtype/storm/clojure_test.clj    |  15 +
 .../test/clj/backtype/storm/cluster_test.clj    |  15 +
 .../test/clj/backtype/storm/config_test.clj     |  15 +
 .../test/clj/backtype/storm/drpc_test.clj       |  15 +
 .../test/clj/backtype/storm/fields_test.clj     |  15 +
 .../test/clj/backtype/storm/grouping_test.clj   |  15 +
 .../clj/backtype/storm/integration_test.clj     |  15 +
 .../clj/backtype/storm/local_state_test.clj     |  15 +
 .../storm/messaging/netty_integration_test.clj  |  15 +
 .../storm/messaging/netty_unit_test.clj         |  15 +
 .../test/clj/backtype/storm/messaging_test.clj  |  15 +
 .../test/clj/backtype/storm/metrics_test.clj    |  15 +
 .../test/clj/backtype/storm/multilang_test.clj  |  15 +
 .../test/clj/backtype/storm/nimbus_test.clj     |  15 +
 .../test/clj/backtype/storm/scheduler_test.clj  |  15 +
 .../storm/security/auth/AuthUtils_test.clj      |  15 +
 .../storm/security/auth/ReqContext_test.clj     |  15 +
 .../security/auth/SaslTransportPlugin_test.clj  |  15 +
 .../storm/security/auth/ThriftClient_test.clj   |  15 +
 .../storm/security/auth/ThriftServer_test.clj   |  15 +
 .../backtype/storm/security/auth/auth_test.clj  |  15 +
 .../BlowfishTupleSerializer_test.clj            |  15 +
 .../serialization/SerializationFactory_test.clj |  15 +
 .../clj/backtype/storm/serialization_test.clj   |  15 +
 .../clj/backtype/storm/subtopology_test.clj     |  15 +
 .../test/clj/backtype/storm/supervisor_test.clj |  15 +
 .../test/clj/backtype/storm/testing4j_test.clj  |  15 +
 .../test/clj/backtype/storm/tick_tuple_test.clj |  15 +
 .../clj/backtype/storm/transactional_test.clj   |  15 +
 .../test/clj/backtype/storm/tuple_test.clj      |  15 +
 .../test/clj/backtype/storm/utils_test.clj      |  15 +
 .../clj/backtype/storm/versioned_store_test.clj |  15 +
 .../test/clj/storm/trident/integration_test.clj |  15 +
 .../test/clj/storm/trident/state_test.clj       |  15 +
 .../test/clj/storm/trident/tuple_test.clj       |  15 +
 storm-lib/project.clj                           |  15 +
 558 files changed, 9974 insertions(+), 695 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/conf/defaults.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/project.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index c2b391a,0000000..d765e71
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@@ -1,204 -1,0 +1,221 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import backtype.storm.Config;
 +import backtype.storm.messaging.IConnection;
 +import backtype.storm.messaging.TaskMessage;
 +import backtype.storm.utils.Utils;
 +import org.jboss.netty.bootstrap.ClientBootstrap;
 +import org.jboss.netty.channel.Channel;
 +import org.jboss.netty.channel.ChannelFactory;
 +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.net.InetSocketAddress;
 +import java.util.Map;
 +import java.util.Random;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.LinkedBlockingQueue;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +class Client implements IConnection {
 +    private static final Logger LOG = LoggerFactory.getLogger(Client.class);
 +    private final int max_retries;
 +    private final int base_sleep_ms;
 +    private final int max_sleep_ms;
 +    private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
 +    private AtomicReference<Channel> channelRef;
 +    private final ClientBootstrap bootstrap;
 +    private InetSocketAddress remote_addr;
 +    private AtomicInteger retries;
 +    private final Random random = new Random();
 +    private final ChannelFactory factory;
 +    private final int buffer_size;
 +    private final AtomicBoolean being_closed;
 +
 +    @SuppressWarnings("rawtypes")
 +    Client(Map storm_conf, String host, int port) {
 +        message_queue = new LinkedBlockingQueue<Object>();
 +        retries = new AtomicInteger(0);
 +        channelRef = new AtomicReference<Channel>(null);
 +        being_closed = new AtomicBoolean(false);
 +
 +        // Configure
 +        buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
 +        max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
 +        base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
 +        max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
 +        int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
 +
 +        if (maxWorkers > 0) {
 +            factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
 +        } else {
 +            factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
 +        }
 +        bootstrap = new ClientBootstrap(factory);
 +        bootstrap.setOption("tcpNoDelay", true);
 +        bootstrap.setOption("sendBufferSize", buffer_size);
 +        bootstrap.setOption("keepAlive", true);
 +
 +        // Set up the pipeline factory.
 +        bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
 +
 +        // Start the connection attempt.
 +        remote_addr = new InetSocketAddress(host, port);
 +        bootstrap.connect(remote_addr);
 +    }
 +
 +    /**
 +     * We will retry connection with exponential back-off policy
 +     */
 +    void reconnect() {
 +        try {
 +            int tried_count = retries.incrementAndGet();
 +            if (tried_count <= max_retries) {
 +                Thread.sleep(getSleepTimeMs());
 +                LOG.info("Reconnect ... [{}]", tried_count);
 +                bootstrap.connect(remote_addr);
 +                LOG.debug("connection started...");
 +            } else {
 +                LOG.warn("Remote address is not reachable. We will close this client.");
 +                close();
 +            }
 +        } catch (InterruptedException e) {
 +            LOG.warn("connection failed", e);
 +        }
 +    }
 +
 +    /**
 +     * # of milliseconds to wait per exponential back-off policy
 +     */
 +    private int getSleepTimeMs()
 +    {
 +        int backoff = 1 << retries.get();
 +        int sleepMs = base_sleep_ms * Math.max(1, random.nextInt(backoff));
 +        if ( sleepMs > max_sleep_ms )
 +            sleepMs = max_sleep_ms;
 +        return sleepMs;
 +    }
 +
 +    /**
 +     * Enqueue a task message to be sent to server
 +     */
 +    public void send(int task, byte[] message) {
 +        //throw exception if the client is being closed
 +        if (being_closed.get()) {
 +            throw new RuntimeException("Client is being closed, and does not take requests any more");
 +        }
 +
 +        try {
 +            message_queue.put(new TaskMessage(task, message));
 +        } catch (InterruptedException e) {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    /**
 +     * Take all enqueued messages from queue
 +     * @return
 +     * @throws InterruptedException
 +     */
 +    MessageBatch takeMessages()  throws InterruptedException {
 +        //1st message
 +        MessageBatch batch = new MessageBatch(buffer_size);
 +        Object msg = message_queue.take();
 +        batch.add(msg);
 +
 +        //we will discard any message after CLOSE
 +        if (msg==ControlMessage.CLOSE_MESSAGE)
 +            return batch;
 +
 +        while (!batch.isFull()) {
 +            //peek the next message
 +            msg = message_queue.peek();
 +            //no more messages
 +            if (msg == null) break;
 +
 +            //we will discard any message after CLOSE
 +            if (msg==ControlMessage.CLOSE_MESSAGE) {
 +                message_queue.take();
 +                batch.add(msg);
 +                break;
 +            }
 +
 +            //try to add this msg into batch
 +            if (!batch.tryAdd((TaskMessage) msg))
 +                break;
 +
 +            //remove this message
 +            message_queue.take();
 +        }
 +
 +        return batch;
 +    }
 +
 +    /**
 +     * gracefully close this client.
 +     *
 +     * We will send all existing requests, and then invoke close_n_release() method
 +     */
 +    public synchronized void close() {
 +        if (!being_closed.get()) {
 +            //enqueue a CLOSE message so that shutdown() will be invoked
 +            try {
 +                message_queue.put(ControlMessage.CLOSE_MESSAGE);
 +                being_closed.set(true);
 +            } catch (InterruptedException e) {
 +                close_n_release();
 +            }
 +        }
 +    }
 +
 +    /**
 +     * close_n_release() is invoked after all messages have been sent.
 +     */
 +    void  close_n_release() {
 +        if (channelRef.get() != null)
 +            channelRef.get().close().awaitUninterruptibly();
 +
 +        //we need to release resources
 +        new Thread(new Runnable() {
 +            @Override
 +            public void run() {
 +                factory.releaseExternalResources();
 +            }}).start();
 +    }
 +
 +    public TaskMessage recv(int flags) {
 +        throw new RuntimeException("Client connection should not receive any messages");
 +    }
 +
 +    void setChannel(Channel channel) {
 +        channelRef.set(channel);
 +        //reset retries
 +        if (channel != null)
 +            retries.set(0);
 +    }
 +
 +}
 +
 +
 +
 +

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 018e0f9,0000000..3e09dd1
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@@ -1,50 -1,0 +1,67 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import backtype.storm.messaging.IConnection;
 +import backtype.storm.messaging.IContext;
 +
 +import java.util.Map;
 +import java.util.Vector;
 +
 +public class Context implements IContext {
 +    @SuppressWarnings("rawtypes")
 +    private Map storm_conf;
 +    private volatile Vector<IConnection> connections;
 +    
 +    /**
 +     * initialization per Storm configuration 
 +     */
 +    @SuppressWarnings("rawtypes")
 +    public void prepare(Map storm_conf) {
 +       this.storm_conf = storm_conf;
 +       connections = new Vector<IConnection>(); 
 +    }
 +
 +    /**
 +     * establish a server with a binding port
 +     */
 +    public IConnection bind(String storm_id, int port) {
 +        IConnection server = new Server(storm_conf, port);
 +        connections.add(server);
 +        return server;
 +    }
 +
 +    /**
 +     * establish a connection to a remote server
 +     */
 +    public IConnection connect(String storm_id, String host, int port) {        
 +        IConnection client =  new Client(storm_conf, host, port);
 +        connections.add(client);
 +        return client;
 +    }
 +
 +    /**
 +     * terminate this context
 +     */
 +    public void term() {
 +        for (IConnection conn : connections) {
 +            conn.close();
 +        }
 +        connections = null;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
index 4cc2040,0000000..a552cf7
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
@@@ -1,50 -1,0 +1,67 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import org.jboss.netty.buffer.ChannelBuffer;
 +import org.jboss.netty.buffer.ChannelBufferOutputStream;
 +import org.jboss.netty.buffer.ChannelBuffers;
 +
 +enum ControlMessage {
 +    CLOSE_MESSAGE((short)-100),
 +    EOB_MESSAGE((short)-201),
 +    OK_RESPONSE((short)-200),
 +    FAILURE_RESPONSE((short)-400);
 +
 +    private short code;
 +
 +    //private constructor
 +    private ControlMessage(short code) {
 +        this.code = code;
 +    }
 +
 +    /**
 +     * Return a control message per an encoded status code
 +     * @param encoded
 +     * @return
 +     */
 +    static ControlMessage mkMessage(short encoded) {
 +        for(ControlMessage cm: ControlMessage.values()) {
 +          if(encoded == cm.code) return cm;
 +        }
 +        return null;
 +    }
 +
 +    int encodeLength() {
 +        return 2; //short
 +    }
 +    
 +    /**
 +     * encode the current Control Message into a channel buffer
 +     * @throws Exception
 +     */
 +    ChannelBuffer buffer() throws Exception {
 +        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));      
 +        write(bout);
 +        bout.close();
 +        return bout.buffer();
 +    }
 +
 +    void write(ChannelBufferOutputStream bout) throws Exception {
 +        bout.writeShort(code);        
 +    } 
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
index a9d46a2,0000000..9d287e4
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@@ -1,151 -1,0 +1,168 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import backtype.storm.messaging.TaskMessage;
 +import org.jboss.netty.buffer.ChannelBuffer;
 +import org.jboss.netty.buffer.ChannelBufferOutputStream;
 +import org.jboss.netty.buffer.ChannelBuffers;
 +
 +import java.util.ArrayList;
 +
 +class MessageBatch {
 +    private int buffer_size;
 +    private ArrayList<Object> msgs;
 +    private int encoded_length;
 +
 +    MessageBatch(int buffer_size) {
 +        this.buffer_size = buffer_size;
 +        msgs = new ArrayList<Object>();
 +        encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
 +    }
 +
 +    void add(Object obj) {
 +        if (obj == null)
 +            throw new RuntimeException("null object forbidded in message batch");
 +
 +        if (obj instanceof TaskMessage) {
 +            TaskMessage msg = (TaskMessage)obj;
 +            msgs.add(msg);
 +            encoded_length += msgEncodeLength(msg);
 +            return;
 +        }
 +
 +        if (obj instanceof ControlMessage) {
 +            ControlMessage msg = (ControlMessage)obj;
 +            msgs.add(msg);
 +            encoded_length += msg.encodeLength();
 +            return;
 +        }
 +
 +        throw new RuntimeException("Unsuppoted object type "+obj.getClass().getName());
 +    }
 +
 +    void remove(Object obj) {
 +        if (obj == null) return;
 +
 +        if (obj instanceof TaskMessage) {
 +            TaskMessage msg = (TaskMessage)obj;
 +            msgs.remove(msg);
 +            encoded_length -= msgEncodeLength(msg);
 +            return;
 +        }
 +
 +        if (obj instanceof ControlMessage) {
 +            ControlMessage msg = (ControlMessage)obj;
 +            msgs.remove(msg);
 +            encoded_length -= msg.encodeLength();
 +            return;
 +        }
 +    }
 +
 +    Object get(int index) {
 +        return msgs.get(index);
 +    }
 +
 +    /**
 +     * try to add a TaskMessage to a batch
 +     * @param taskMsg
 +     * @return false if the msg could not be added due to buffer size limit; true otherwise
 +     */
 +    boolean tryAdd(TaskMessage taskMsg) {
 +        if ((encoded_length + msgEncodeLength(taskMsg)) > buffer_size) 
 +            return false;
 +        add(taskMsg);
 +        return true;
 +    }
 +
 +    private int msgEncodeLength(TaskMessage taskMsg) {
 +        if (taskMsg == null) return 0;
 +
 +        int size = 6; //INT + SHORT
 +        if (taskMsg.message() != null) 
 +            size += taskMsg.message().length;
 +        return size;
 +    }
 +
 +    /**
 +     * Has this batch used up allowed buffer size
 +     * @return
 +     */
 +    boolean isFull() {
 +        return encoded_length >= buffer_size;
 +    }
 +
 +    /**
 +     * true if this batch doesn't have any messages 
 +     * @return
 +     */
 +    boolean isEmpty() {
 +        return msgs.isEmpty();
 +    }
 +
 +    /**
 +     * # of msgs in this batch
 +     * @return
 +     */
 +    int size() {
 +        return msgs.size();
 +    }
 +
 +    /**
 +     * create a buffer containing the encoding of this batch
 +     */
 +    ChannelBuffer buffer() throws Exception {
 +        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
 +        
 +        for (Object msg : msgs) 
 +            if (msg instanceof TaskMessage)
 +                writeTaskMessage(bout, (TaskMessage)msg);
 +            else
 +                ((ControlMessage)msg).write(bout);
 +        
 +        //add a END_OF_BATCH indicator
 +        ControlMessage.EOB_MESSAGE.write(bout);
 +
 +        bout.close();
 +
 +        return bout.buffer();
 +    }
 +
 +    /**
 +     * write a TaskMessage into a stream
 +     *
 +     * Each TaskMessage is encoded as:
 +     *  task ... short(2)
 +     *  len ... int(4)
 +     *  payload ... byte[]     *  
 +     */
 +    private void writeTaskMessage(ChannelBufferOutputStream bout, TaskMessage message) throws Exception {
 +        int payload_len = 0;
 +        if (message.message() != null)
 +            payload_len =  message.message().length;
 +
 +        int task_id = message.task();
 +        if (task_id > Short.MAX_VALUE)
 +            throw new RuntimeException("Task ID should not exceed "+Short.MAX_VALUE);
 +        
 +        bout.writeShort((short)task_id);
 +        bout.writeInt(payload_len);
 +        if (payload_len >0)
 +            bout.write(message.message());
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
index 76776a9,0000000..3365e58
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
@@@ -1,68 -1,0 +1,85 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import backtype.storm.messaging.TaskMessage;
 +import org.jboss.netty.buffer.ChannelBuffer;
 +import org.jboss.netty.channel.Channel;
 +import org.jboss.netty.channel.ChannelHandlerContext;
 +import org.jboss.netty.handler.codec.frame.FrameDecoder;
 +
 +public class MessageDecoder extends FrameDecoder {    
 +    /*
 +     * Each ControlMessage is encoded as:
 +     *  code (<0) ... short(2)
 +     * Each TaskMessage is encoded as:
 +     *  task (>=0) ... short(2)
 +     *  len ... int(4)
 +     *  payload ... byte[]     *  
 +     */
 +    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
 +        // Make sure that we have received at least a short 
 +        if (buf.readableBytes() < 2) {
 +            //need more data
 +            return null;
 +        }
 +
 +        // Mark the current buffer position before reading task/len field
 +        // because the whole frame might not be in the buffer yet.
 +        // We will reset the buffer position to the marked position if
 +        // there's not enough bytes in the buffer.
 +        buf.markReaderIndex();
 +
 +        //read the short field
 +        short code = buf.readShort();
 +        
 +        //case 1: Control message
 +        ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
 +        if (ctrl_msg != null) return ctrl_msg;
 +        
 +        //case 2: task Message
 +        short task = code;
 +        
 +        // Make sure that we have received at least an integer (length) 
 +        if (buf.readableBytes() < 4) {
 +            //need more data
 +            buf.resetReaderIndex();
 +            return null;
 +        }
 +
 +        // Read the length field.
 +        int length = buf.readInt();
 +        if (length<=0) {
 +            return new TaskMessage(task, null);
 +        }
 +        
 +        // Make sure if there's enough bytes in the buffer.
 +        if (buf.readableBytes() < length) {
 +            // The whole bytes were not received yet - return null.
 +            buf.resetReaderIndex();
 +            return null;
 +        }
 +
 +        // There's enough bytes in the buffer. Read it.
 +        ChannelBuffer payload = buf.readBytes(length);
 +
 +        // Successfully decoded a frame.
 +        // Return a TaskMessage object
 +        return new TaskMessage(task,payload.array());
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
index c0ac8f1,0000000..e6e65c3
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
@@@ -1,22 -1,0 +1,39 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import org.jboss.netty.channel.Channel;
 +import org.jboss.netty.channel.ChannelHandlerContext;
 +import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
 +
 +public class MessageEncoder extends OneToOneEncoder {    
 +    @Override
 +    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
 +        if (obj instanceof ControlMessage) {
 +            return ((ControlMessage)obj).buffer();
 +        }
 +
 +        if (obj instanceof MessageBatch) {
 +            return ((MessageBatch)obj).buffer();
 +        } 
 +        
 +        throw new RuntimeException("Unsupported encoding of object of class "+obj.getClass().getName());
 +    }
 +
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index bf6825c,0000000..ad811b0
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@@ -1,119 -1,0 +1,136 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import backtype.storm.Config;
 +import backtype.storm.messaging.IConnection;
 +import backtype.storm.messaging.TaskMessage;
 +import backtype.storm.utils.Utils;
 +import org.jboss.netty.bootstrap.ServerBootstrap;
 +import org.jboss.netty.channel.Channel;
 +import org.jboss.netty.channel.ChannelFactory;
 +import org.jboss.netty.channel.group.ChannelGroup;
 +import org.jboss.netty.channel.group.DefaultChannelGroup;
 +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.net.InetSocketAddress;
 +import java.util.Map;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.LinkedBlockingQueue;
 +
 +class Server implements IConnection {
 +    private static final Logger LOG = LoggerFactory.getLogger(Server.class);
 +    @SuppressWarnings("rawtypes")
 +    Map storm_conf;
 +    int port;
 +    private LinkedBlockingQueue<TaskMessage> message_queue;
 +    volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
 +    final ChannelFactory factory;
 +    final ServerBootstrap bootstrap;
 +
 +    @SuppressWarnings("rawtypes")
 +    Server(Map storm_conf, int port) {
 +        this.storm_conf = storm_conf;
 +        this.port = port;
 +        message_queue = new LinkedBlockingQueue<TaskMessage>();
 +
 +        // Configure the server.
 +        int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
 +        int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS));
 +
 +        if (maxWorkers > 0) {
 +            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
 +        } else {
 +            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
 +        }
 +        bootstrap = new ServerBootstrap(factory);
 +        bootstrap.setOption("child.tcpNoDelay", true);
 +        bootstrap.setOption("child.receiveBufferSize", buffer_size);
 +        bootstrap.setOption("child.keepAlive", true);
 +
 +        // Set up the pipeline factory.
 +        bootstrap.setPipelineFactory(new StormServerPipelineFactory(this));
 +
 +        // Bind and start to accept incoming connections.
 +        Channel channel = bootstrap.bind(new InetSocketAddress(port));
 +        allChannels.add(channel);
 +    }
 +
 +    /**
 +     * enqueue a received message 
 +     * @param message
 +     * @throws InterruptedException
 +     */
 +    protected void enqueue(TaskMessage message) throws InterruptedException {
 +        message_queue.put(message);
 +        LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length);
 +    }
 +    
 +    /**
 +     * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
 +     */
 +    public TaskMessage recv(int flags)  {
 +        if ((flags & 0x01) == 0x01) { 
 +            //non-blocking
 +            return message_queue.poll();
 +        } else {
 +            try {
 +                TaskMessage request = message_queue.take();
 +                LOG.debug("request to be processed: {}", request);
 +                return request;
 +            } catch (InterruptedException e) {
 +                LOG.info("exception within msg receiving", e);
 +                return null;
 +            }
 +        }
 +    }
 +
 +    /**
 +     * register a newly created channel
 +     * @param channel
 +     */
 +    protected void addChannel(Channel channel) {
 +        allChannels.add(channel);
 +    }
 +    
 +    /**
 +     * close a channel
 +     * @param channel
 +     */
 +    protected void closeChannel(Channel channel) {
 +        channel.close().awaitUninterruptibly();
 +        allChannels.remove(channel);
 +    }
 +
 +    /**
 +     * close all channels, and release resources
 +     */
 +    public synchronized void close() {
 +        if (allChannels != null) {  
 +            allChannels.close().awaitUninterruptibly();
 +            factory.releaseExternalResources();
 +            allChannels = null;
 +        }
 +    }
 +
 +    public void send(int task, byte[] message) {
 +        throw new RuntimeException("Server connection should not send any messages");
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
index 6fbfb1c,0000000..65c36a7
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
@@@ -1,104 -1,0 +1,121 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import org.jboss.netty.channel.*;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.net.ConnectException;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +public class StormClientHandler extends SimpleChannelUpstreamHandler  {
 +    private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
 +    private Client client;
 +    private AtomicBoolean being_closed;
 +    long start_time; 
 +    
 +    StormClientHandler(Client client) {
 +        this.client = client;
 +        being_closed = new AtomicBoolean(false);
 +        start_time = System.currentTimeMillis();
 +    }
 +
 +    @Override
 +    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) {
 +        //register the newly established channel
 +        Channel channel = event.getChannel();
 +        client.setChannel(channel);
 +        LOG.debug("connection established to a remote host");
 +        
 +        //send next request
 +        try {
 +            sendRequests(channel, client.takeMessages());
 +        } catch (InterruptedException e) {
 +            channel.close();
 +        }
 +    }
 +
 +    @Override
 +    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
 +        LOG.debug("send/recv time (ms): {}", (System.currentTimeMillis() - start_time));
 +        
 +        //examine the response message from server
 +        ControlMessage msg = (ControlMessage)event.getMessage();
 +        if (msg==ControlMessage.FAILURE_RESPONSE)
 +            LOG.info("failure response:{}", msg);
 +
 +        //send next request
 +        Channel channel = event.getChannel();
 +        try {
 +            sendRequests(channel, client.takeMessages());
 +        } catch (InterruptedException e) {
 +            channel.close();
 +        }
 +    }
 +
 +    /**
 +     * Retrieve a request from message queue, and send to server
 +     * @param channel
 +     */
 +    private void sendRequests(Channel channel, final MessageBatch requests) {
 +        if (requests==null || requests.size()==0 || being_closed.get()) return;
 +
 +        //if task==CLOSE_MESSAGE for our last request, the channel is to be closed
 +        Object last_msg = requests.get(requests.size()-1);
 +        if (last_msg==ControlMessage.CLOSE_MESSAGE) {
 +            being_closed.set(true);
 +            requests.remove(last_msg);
 +        }
 +
 +        //we may don't need do anything if no requests found
 +        if (requests.isEmpty()) {
 +            if (being_closed.get())
 +                client.close_n_release();
 +            return;
 +        }
 +
 +        //write request into socket channel
 +        ChannelFuture future = channel.write(requests);
 +        future.addListener(new ChannelFutureListener() {
 +            public void operationComplete(ChannelFuture future)
 +                    throws Exception {
 +                if (!future.isSuccess()) {
 +                    LOG.info("failed to send requests:", future.getCause());
 +                    future.getChannel().close();
 +                } else {
 +                    LOG.debug("{} request(s) sent", requests.size());
 +                }
 +                if (being_closed.get())
 +                    client.close_n_release();
 +            }
 +        });
 +    }
 +
 +    @Override
 +    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
 +        Throwable cause = event.getCause();
 +        if (!(cause instanceof ConnectException)) {
 +            LOG.info("Connection failed:", cause);
 +        } 
 +        if (!being_closed.get()) {
 +            client.setChannel(null);
 +            client.reconnect();
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
index 91c513a,0000000..6bad8e3
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@@ -1,27 -1,0 +1,44 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import org.jboss.netty.channel.ChannelPipeline;
 +import org.jboss.netty.channel.ChannelPipelineFactory;
 +import org.jboss.netty.channel.Channels;
 +
 +class StormClientPipelineFactory implements ChannelPipelineFactory {
 +    private Client client;
 +
 +    StormClientPipelineFactory(Client client) {
 +        this.client = client;        
 +    }
 +
 +    public ChannelPipeline getPipeline() throws Exception {
 +        // Create a default pipeline implementation.
 +        ChannelPipeline pipeline = Channels.pipeline();
 +
 +        // Decoder
 +        pipeline.addLast("decoder", new MessageDecoder());
 +        // Encoder
 +        pipeline.addLast("encoder", new MessageEncoder());
 +        // business logic.
 +        pipeline.addLast("handler", new StormClientHandler(client));
 +
 +        return pipeline;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
index 9a5aaed,0000000..093fb61
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
@@@ -1,53 -1,0 +1,70 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import backtype.storm.messaging.TaskMessage;
 +import org.jboss.netty.channel.*;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +class StormServerHandler extends SimpleChannelUpstreamHandler  {
 +    private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
 +    Server server;
 +    private AtomicInteger failure_count; 
 +    
 +    StormServerHandler(Server server) {
 +        this.server = server;
 +        failure_count = new AtomicInteger(0);
 +    }
 +    
 +    @Override
 +    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
 +        server.addChannel(e.getChannel());
 +    }
 +    
 +    @Override
 +    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
 +        Object msg = e.getMessage();  
 +        if (msg == null) return;
 +
 +        //end of batch?
 +        if (msg==ControlMessage.EOB_MESSAGE) {
 +            Channel channel = ctx.getChannel();
 +            LOG.debug("Send back response ...");
 +            if (failure_count.get()==0)
 +                channel.write(ControlMessage.OK_RESPONSE);
 +            else channel.write(ControlMessage.FAILURE_RESPONSE);
 +            return;
 +        }
 +        
 +        //enqueue the received message for processing
 +        try {
 +            server.enqueue((TaskMessage)msg);
 +        } catch (InterruptedException e1) {
 +            LOG.info("failed to enqueue a request message", e);
 +            failure_count.incrementAndGet();
 +        }
 +    }
 +
 +    @Override
 +    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
 +        server.closeChannel(e.getChannel());
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
index 56b0834,0000000..df29ba8
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
@@@ -1,28 -1,0 +1,45 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import org.jboss.netty.channel.ChannelPipeline;
 +import org.jboss.netty.channel.ChannelPipelineFactory;
 +import org.jboss.netty.channel.Channels;
 +
 +
 +class StormServerPipelineFactory implements  ChannelPipelineFactory {
 +    private Server server;
 +    
 +    StormServerPipelineFactory(Server server) {
 +        this.server = server;        
 +    }
 +    
 +    public ChannelPipeline getPipeline() throws Exception {
 +        // Create a default pipeline implementation.
 +        ChannelPipeline pipeline = Channels.pipeline();
 +
 +        // Decoder
 +        pipeline.addLast("decoder", new MessageDecoder());
 +        // Encoder
 +        pipeline.addLast("encoder", new MessageEncoder());
 +        // business logic.
 +        pipeline.addLast("handler", new StormServerHandler(server));
 +
 +        return pipeline;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
index eefcb48,0000000..0c908c5
mode 100644,000000..100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
@@@ -1,44 -1,0 +1,59 @@@
++;; Licensed to the Apache Software Foundation (ASF) under one
++;; or more contributor license agreements.  See the NOTICE file
++;; distributed with this work for additional information
++;; regarding copyright ownership.  The ASF licenses this file
++;; to you under the Apache License, Version 2.0 (the
++;; "License"); you may not use this file except in compliance
++;; with the License.  You may obtain a copy of the License at
++;;
++;; http:;; www.apache.org/licenses/LICENSE-2.0
++;;
++;; Unless required by applicable law or agreed to in writing, software
++;; distributed under the License is distributed on an "AS IS" BASIS,
++;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++;; See the License for the specific language governing permissions and
++;; limitations under the License.
 +(ns backtype.storm.messaging.netty-integration-test
 +  (:use [clojure test])
 +  (:import [backtype.storm.messaging TransportFactory])
 +  (:import [backtype.storm.testing TestWordSpout TestGlobalCount])
 +  (:use [backtype.storm bootstrap testing util]))
 +
 +(bootstrap)
 +
 +(deftest test-integration
 +  (with-simulated-time-local-cluster [cluster :supervisors 4 :supervisor-slot-port-min 6710
 +                                      :daemon-conf {STORM-LOCAL-MODE-ZMQ true 
 +                                                    STORM-MESSAGING-TRANSPORT  "backtype.storm.messaging.netty.Context"
 +                                                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
 +                                                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
 +                                                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
 +                                                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
 +                                                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
 +                                                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
 +                                                    }]
 +    (let [topology (thrift/mk-topology
 +                     {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
 +                     {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
 +                                               :parallelism-hint 6)})
 +          results (complete-topology cluster
 +                                     topology
 +                                     ;; important for test that
 +                                     ;; #tuples = multiple of 4 and 6
 +                                     :storm-conf {TOPOLOGY-WORKERS 3}
 +                                     :mock-sources {"1" [["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ]}
 +                                     )]
 +      (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
 +               (read-tuples results "2"))))))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index 12ebe5d,0000000..20914ef
mode 100644,000000..100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@@ -1,97 -1,0 +1,112 @@@
++;; Licensed to the Apache Software Foundation (ASF) under one
++;; or more contributor license agreements.  See the NOTICE file
++;; distributed with this work for additional information
++;; regarding copyright ownership.  The ASF licenses this file
++;; to you under the Apache License, Version 2.0 (the
++;; "License"); you may not use this file except in compliance
++;; with the License.  You may obtain a copy of the License at
++;;
++;; http:;; www.apache.org/licenses/LICENSE-2.0
++;;
++;; Unless required by applicable law or agreed to in writing, software
++;; distributed under the License is distributed on an "AS IS" BASIS,
++;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++;; See the License for the specific language governing permissions and
++;; limitations under the License.
 +(ns backtype.storm.messaging.netty-unit-test
 +  (:use [clojure test])
 +  (:import [backtype.storm.messaging TransportFactory])
 +  (:use [backtype.storm bootstrap testing util]))
 +
 +(bootstrap)
 +
 +(def port 6700) 
 +(def task 1) 
 +
 +(deftest test-basic
 +  (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
 +        storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
 +                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
 +                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
 +                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
 +                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
 +                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
 +                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
 +                    }
 +        context (TransportFactory/makeContext storm-conf)
 +        server (.bind context nil port)
 +        client (.connect context nil "localhost" port)
 +        _ (.send client task (.getBytes req_msg))
 +        resp (.recv server 0)]
 +    (is (= task (.task resp)))
 +    (is (= req_msg (String. (.message resp))))
 +    (.close client)
 +    (.close server)
 +    (.term context)))    
 +
 +(deftest test-large-msg
 +  (let [req_msg (apply str (repeat 2048000 'c')) 
 +        storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
 +                    STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
 +                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
 +                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
 +                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
 +                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
 +                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
 +                    }
 +        context (TransportFactory/makeContext storm-conf)
 +        server (.bind context nil port)
 +        client (.connect context nil "localhost" port)
 +        _ (.send client task (.getBytes req_msg))
 +        resp (.recv server 0)]
 +    (is (= task (.task resp)))
 +    (is (= req_msg (String. (.message resp))))
 +    (.close client)
 +    (.close server)
 +    (.term context)))    
 +    
 +(deftest test-server-delayed
 +    (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
 +       storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
 +                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
 +                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
 +                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
 +                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
 +                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
 +                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
 +                    }
 +        context (TransportFactory/makeContext storm-conf)
 +        client (.connect context nil "localhost" port)
 +        _ (.send client task (.getBytes req_msg))
 +        _ (Thread/sleep 1000)
 +        server (.bind context nil port)
 +        resp (.recv server 0)]
 +    (is (= task (.task resp)))
 +    (is (= req_msg (String. (.message resp))))
 +    (.close client)
 +    (.close server)
 +    (.term context)))    
 +
 +(deftest test-batch
 +  (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
 +                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
 +                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
 +                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
 +                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
 +                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
 +                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
 +                    }
 +        context (TransportFactory/makeContext storm-conf)
 +        server (.bind context nil port)
 +        client (.connect context nil "localhost" port)]
 +    (doseq [num  (range 1 100000)]
 +      (let [req_msg (str num)]
 +        (.send client task (.getBytes req_msg))))
 +    (doseq [num  (range 1 100000)]
 +      (let [req_msg (str num)
 +            resp (.recv server 0)
 +            resp_msg (String. (.message resp))]
 +        (is (= req_msg resp_msg))))
 +    (.close client)
 +    (.close server)
 +    (.term context)))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/test/clj/backtype/storm/messaging_test.clj
----------------------------------------------------------------------


[4/4] git commit: Merge branch 'netty-default'

Posted by pt...@apache.org.
Merge branch 'netty-default'

Conflicts:
	storm-netty/project.clj


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/d79f6b80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/d79f6b80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/d79f6b80

Branch: refs/heads/master
Commit: d79f6b808cd294b15ec06e41f60b9dbbf1d3986b
Parents: d3414f5 962d520
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Dec 20 21:23:47 2013 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Dec 20 21:23:47 2013 -0500

----------------------------------------------------------------------
 MODULES                                         |   1 -
 bin/install_zmq.sh                              |  48 ----
 conf/defaults.yaml                              |   2 +-
 storm-core/project.clj                          |   2 +-
 .../src/clj/backtype/storm/messaging/zmq.clj    | 108 ---------
 storm-core/src/clj/backtype/storm/testing.clj   |   2 +-
 storm-core/src/clj/zilch/mq.clj                 | 119 ----------
 .../backtype/storm/messaging/netty/Client.java  | 221 ++++++++++++++++++
 .../backtype/storm/messaging/netty/Context.java |  67 ++++++
 .../storm/messaging/netty/ControlMessage.java   |  67 ++++++
 .../storm/messaging/netty/MessageBatch.java     | 168 ++++++++++++++
 .../storm/messaging/netty/MessageDecoder.java   |  85 +++++++
 .../storm/messaging/netty/MessageEncoder.java   |  39 ++++
 .../backtype/storm/messaging/netty/Server.java  | 136 ++++++++++++
 .../messaging/netty/StormClientHandler.java     | 121 ++++++++++
 .../netty/StormClientPipelineFactory.java       |  44 ++++
 .../messaging/netty/StormServerHandler.java     |  70 ++++++
 .../netty/StormServerPipelineFactory.java       |  45 ++++
 .../storm/messaging/netty_integration_test.clj  |  59 +++++
 .../storm/messaging/netty_unit_test.clj         | 112 ++++++++++
 .../test/clj/backtype/storm/messaging_test.clj  |   2 +-
 storm-core/test/clj/zilch/test/mq.clj           | 101 ---------
 storm-netty/project.clj                         |  28 ---
 .../backtype/storm/messaging/netty/Client.java  | 222 -------------------
 .../backtype/storm/messaging/netty/Context.java |  67 ------
 .../storm/messaging/netty/ControlMessage.java   |  68 ------
 .../storm/messaging/netty/MessageBatch.java     | 170 --------------
 .../storm/messaging/netty/MessageDecoder.java   |  85 -------
 .../storm/messaging/netty/MessageEncoder.java   |  39 ----
 .../backtype/storm/messaging/netty/Server.java  | 137 ------------
 .../messaging/netty/StormClientHandler.java     | 128 -----------
 .../netty/StormClientPipelineFactory.java       |  44 ----
 .../messaging/netty/StormServerHandler.java     |  76 -------
 .../netty/StormServerPipelineFactory.java       |  45 ----
 .../storm/messaging/netty_integration_test.clj  |  59 -----
 .../storm/messaging/netty_unit_test.clj         | 112 ----------
 36 files changed, 1238 insertions(+), 1661 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/d79f6b80/conf/defaults.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/d79f6b80/storm-core/project.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/d79f6b80/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------