You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2013/12/21 03:26:20 UTC
[1/4] remove 0MQ and replace with netty
Updated Branches:
refs/heads/master d3414f576 -> d79f6b808
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
deleted file mode 100644
index f2b9329..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import java.net.ConnectException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StormClientHandler extends SimpleChannelUpstreamHandler {
- private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
- private Client client;
- private AtomicBoolean being_closed;
- long start_time;
-
- StormClientHandler(Client client) {
- this.client = client;
- being_closed = new AtomicBoolean(false);
- start_time = System.currentTimeMillis();
- }
-
- @Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) {
- //register the newly established channel
- Channel channel = event.getChannel();
- client.setChannel(channel);
- LOG.debug("connection established to a remote host");
-
- //send next request
- try {
- sendRequests(channel, client.takeMessages());
- } catch (InterruptedException e) {
- channel.close();
- }
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
- LOG.debug("send/recv time (ms): {}", (System.currentTimeMillis() - start_time));
-
- //examine the response message from server
- ControlMessage msg = (ControlMessage)event.getMessage();
- if (msg==ControlMessage.FAILURE_RESPONSE)
- LOG.info("failure response:{}", msg);
-
- //send next request
- Channel channel = event.getChannel();
- try {
- sendRequests(channel, client.takeMessages());
- } catch (InterruptedException e) {
- channel.close();
- }
- }
-
- /**
- * Retrieve a request from message queue, and send to server
- * @param channel
- */
- private void sendRequests(Channel channel, final MessageBatch requests) {
- if (requests==null || requests.size()==0 || being_closed.get()) return;
-
- //if task==CLOSE_MESSAGE for our last request, the channel is to be closed
- Object last_msg = requests.get(requests.size()-1);
- if (last_msg==ControlMessage.CLOSE_MESSAGE) {
- being_closed.set(true);
- requests.remove(last_msg);
- }
-
- //we may don't need do anything if no requests found
- if (requests.isEmpty()) {
- if (being_closed.get())
- client.close_n_release();
- return;
- }
-
- //write request into socket channel
- ChannelFuture future = channel.write(requests);
- future.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future)
- throws Exception {
- if (!future.isSuccess()) {
- LOG.info("failed to send requests:", future.getCause());
- future.getChannel().close();
- } else {
- LOG.debug("{} request(s) sent", requests.size());
- }
- if (being_closed.get())
- client.close_n_release();
- }
- });
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
- Throwable cause = event.getCause();
- if (!(cause instanceof ConnectException)) {
- LOG.info("Connection failed:", cause);
- }
- if (!being_closed.get()) {
- client.setChannel(null);
- client.reconnect();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
deleted file mode 100644
index 91c513a..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-
-class StormClientPipelineFactory implements ChannelPipelineFactory {
- private Client client;
-
- StormClientPipelineFactory(Client client) {
- this.client = client;
- }
-
- public ChannelPipeline getPipeline() throws Exception {
- // Create a default pipeline implementation.
- ChannelPipeline pipeline = Channels.pipeline();
-
- // Decoder
- pipeline.addLast("decoder", new MessageDecoder());
- // Encoder
- pipeline.addLast("encoder", new MessageEncoder());
- // business logic.
- pipeline.addLast("handler", new StormClientHandler(client));
-
- return pipeline;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
deleted file mode 100644
index fd21834..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.messaging.TaskMessage;
-
-class StormServerHandler extends SimpleChannelUpstreamHandler {
- private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
- Server server;
- private AtomicInteger failure_count;
-
- StormServerHandler(Server server) {
- this.server = server;
- failure_count = new AtomicInteger(0);
- }
-
- @Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
- server.addChannel(e.getChannel());
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
- Object msg = e.getMessage();
- if (msg == null) return;
-
- //end of batch?
- if (msg==ControlMessage.EOB_MESSAGE) {
- Channel channel = ctx.getChannel();
- LOG.debug("Send back response ...");
- if (failure_count.get()==0)
- channel.write(ControlMessage.OK_RESPONSE);
- else channel.write(ControlMessage.FAILURE_RESPONSE);
- return;
- }
-
- //enqueue the received message for processing
- try {
- server.enqueue((TaskMessage)msg);
- } catch (InterruptedException e1) {
- LOG.info("failed to enqueue a request message", e);
- failure_count.incrementAndGet();
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- server.closeChannel(e.getChannel());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
deleted file mode 100644
index 56b0834..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-
-
-class StormServerPipelineFactory implements ChannelPipelineFactory {
- private Server server;
-
- StormServerPipelineFactory(Server server) {
- this.server = server;
- }
-
- public ChannelPipeline getPipeline() throws Exception {
- // Create a default pipeline implementation.
- ChannelPipeline pipeline = Channels.pipeline();
-
- // Decoder
- pipeline.addLast("decoder", new MessageDecoder());
- // Encoder
- pipeline.addLast("encoder", new MessageEncoder());
- // business logic.
- pipeline.addLast("handler", new StormServerHandler(server));
-
- return pipeline;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/test/clj/backtype/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-netty/test/clj/backtype/storm/messaging/netty_integration_test.clj b/storm-netty/test/clj/backtype/storm/messaging/netty_integration_test.clj
deleted file mode 100644
index eefcb48..0000000
--- a/storm-netty/test/clj/backtype/storm/messaging/netty_integration_test.clj
+++ /dev/null
@@ -1,44 +0,0 @@
-(ns backtype.storm.messaging.netty-integration-test
- (:use [clojure test])
- (:import [backtype.storm.messaging TransportFactory])
- (:import [backtype.storm.testing TestWordSpout TestGlobalCount])
- (:use [backtype.storm bootstrap testing util]))
-
-(bootstrap)
-
-(deftest test-integration
- (with-simulated-time-local-cluster [cluster :supervisors 4 :supervisor-slot-port-min 6710
- :daemon-conf {STORM-LOCAL-MODE-ZMQ true
- STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
- STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
- STORM-MESSAGING-NETTY-MAX-RETRIES 10
- STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
- STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
- STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
- STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
- }]
- (let [topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
- {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
- :parallelism-hint 6)})
- results (complete-topology cluster
- topology
- ;; important for test that
- ;; #tuples = multiple of 4 and 6
- :storm-conf {TOPOLOGY-WORKERS 3}
- :mock-sources {"1" [["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ["a"] ["b"]
- ]}
- )]
- (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
- (read-tuples results "2"))))))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-netty/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-netty/test/clj/backtype/storm/messaging/netty_unit_test.clj
deleted file mode 100644
index 12ebe5d..0000000
--- a/storm-netty/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ /dev/null
@@ -1,97 +0,0 @@
-(ns backtype.storm.messaging.netty-unit-test
- (:use [clojure test])
- (:import [backtype.storm.messaging TransportFactory])
- (:use [backtype.storm bootstrap testing util]))
-
-(bootstrap)
-
-(def port 6700)
-(def task 1)
-
-(deftest test-basic
- (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
- storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
- STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
- STORM-MESSAGING-NETTY-MAX-RETRIES 10
- STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
- STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
- STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
- STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
- }
- context (TransportFactory/makeContext storm-conf)
- server (.bind context nil port)
- client (.connect context nil "localhost" port)
- _ (.send client task (.getBytes req_msg))
- resp (.recv server 0)]
- (is (= task (.task resp)))
- (is (= req_msg (String. (.message resp))))
- (.close client)
- (.close server)
- (.term context)))
-
-(deftest test-large-msg
- (let [req_msg (apply str (repeat 2048000 'c'))
- storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
- STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
- STORM-MESSAGING-NETTY-MAX-RETRIES 10
- STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
- STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
- STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
- STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
- }
- context (TransportFactory/makeContext storm-conf)
- server (.bind context nil port)
- client (.connect context nil "localhost" port)
- _ (.send client task (.getBytes req_msg))
- resp (.recv server 0)]
- (is (= task (.task resp)))
- (is (= req_msg (String. (.message resp))))
- (.close client)
- (.close server)
- (.term context)))
-
-(deftest test-server-delayed
- (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
- storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
- STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
- STORM-MESSAGING-NETTY-MAX-RETRIES 10
- STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
- STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
- STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
- STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
- }
- context (TransportFactory/makeContext storm-conf)
- client (.connect context nil "localhost" port)
- _ (.send client task (.getBytes req_msg))
- _ (Thread/sleep 1000)
- server (.bind context nil port)
- resp (.recv server 0)]
- (is (= task (.task resp)))
- (is (= req_msg (String. (.message resp))))
- (.close client)
- (.close server)
- (.term context)))
-
-(deftest test-batch
- (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
- STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
- STORM-MESSAGING-NETTY-MAX-RETRIES 10
- STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
- STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
- STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
- STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
- }
- context (TransportFactory/makeContext storm-conf)
- server (.bind context nil port)
- client (.connect context nil "localhost" port)]
- (doseq [num (range 1 100000)]
- (let [req_msg (str num)]
- (.send client task (.getBytes req_msg))))
- (doseq [num (range 1 100000)]
- (let [req_msg (str num)
- resp (.recv server 0)
- resp_msg (String. (.message resp))]
- (is (= req_msg resp_msg))))
- (.close client)
- (.close server)
- (.term context)))
[2/4] git commit: remove 0MQ and replace with netty
Posted by pt...@apache.org.
remove 0MQ and replace with netty
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/b63ed139
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/b63ed139
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/b63ed139
Branch: refs/heads/master
Commit: b63ed13946eae3acaab4a51bba705124701207eb
Parents: 1bcc169
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Dec 10 11:23:11 2013 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Dec 10 11:23:11 2013 -0500
----------------------------------------------------------------------
MODULES | 1 -
bin/install_zmq.sh | 31 ---
conf/defaults.yaml | 2 +-
storm-core/project.clj | 2 +-
.../src/clj/backtype/storm/messaging/zmq.clj | 93 ---------
storm-core/src/clj/backtype/storm/testing.clj | 2 +-
storm-core/src/clj/zilch/mq.clj | 104 ----------
.../backtype/storm/messaging/netty/Client.java | 204 ++++++++++++++++++
.../backtype/storm/messaging/netty/Context.java | 50 +++++
.../storm/messaging/netty/ControlMessage.java | 50 +++++
.../storm/messaging/netty/MessageBatch.java | 151 ++++++++++++++
.../storm/messaging/netty/MessageDecoder.java | 68 ++++++
.../storm/messaging/netty/MessageEncoder.java | 22 ++
.../backtype/storm/messaging/netty/Server.java | 119 +++++++++++
.../messaging/netty/StormClientHandler.java | 104 ++++++++++
.../netty/StormClientPipelineFactory.java | 27 +++
.../messaging/netty/StormServerHandler.java | 53 +++++
.../netty/StormServerPipelineFactory.java | 28 +++
.../storm/messaging/netty_integration_test.clj | 44 ++++
.../storm/messaging/netty_unit_test.clj | 97 +++++++++
.../test/clj/backtype/storm/messaging_test.clj | 2 +-
storm-core/test/clj/zilch/test/mq.clj | 86 --------
storm-netty/project.clj | 13 --
.../backtype/storm/messaging/netty/Client.java | 205 -------------------
.../backtype/storm/messaging/netty/Context.java | 50 -----
.../storm/messaging/netty/ControlMessage.java | 51 -----
.../storm/messaging/netty/MessageBatch.java | 153 --------------
.../storm/messaging/netty/MessageDecoder.java | 68 ------
.../storm/messaging/netty/MessageEncoder.java | 22 --
.../backtype/storm/messaging/netty/Server.java | 120 -----------
.../messaging/netty/StormClientHandler.java | 111 ----------
.../netty/StormClientPipelineFactory.java | 27 ---
.../messaging/netty/StormServerHandler.java | 59 ------
.../netty/StormServerPipelineFactory.java | 28 ---
.../storm/messaging/netty_integration_test.clj | 44 ----
.../storm/messaging/netty_unit_test.clj | 97 ---------
36 files changed, 1021 insertions(+), 1367 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/MODULES
----------------------------------------------------------------------
diff --git a/MODULES b/MODULES
index 76c078a..aa29093 100644
--- a/MODULES
+++ b/MODULES
@@ -1,4 +1,3 @@
storm-console-logging
storm-core
-storm-netty
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/bin/install_zmq.sh
----------------------------------------------------------------------
diff --git a/bin/install_zmq.sh b/bin/install_zmq.sh
deleted file mode 100755
index dc744f1..0000000
--- a/bin/install_zmq.sh
+++ /dev/null
@@ -1,31 +0,0 @@
-#!/bin/bash
-export JAVA_HOME=${JAVA_HOME:/usr/libexec/java_home}
-
-if [ ! -d "$JAVA_HOME/include" ]; then
- echo "
-Looks like you're missing your 'include' directory. If you're using Mac OS X, You'll need to install the Java dev package.
-
-- Navigate to http://goo.gl/D8lI
-- Click the Java tab on the right
-- Install the appropriate version and try again.
-"
- exit -1;
-fi
-
-#install zeromq
-wget http://download.zeromq.org/zeromq-2.1.7.tar.gz
-tar -xzf zeromq-2.1.7.tar.gz
-cd zeromq-2.1.7
-./configure
-make
-sudo make install
-
-cd ../
-
-#install jzmq (both native and into local maven cache)
-git clone https://github.com/nathanmarz/jzmq.git
-cd jzmq
-./autogen.sh
-./configure
-make
-sudo make install
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index a5b31f4..35e7b00 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -18,7 +18,7 @@ storm.zookeeper.retry.intervalceiling.millis: 30000
storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false
storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"
-storm.messaging.transport: "backtype.storm.messaging.zmq"
+storm.messaging.transport: "backtype.storm.messaging.netty.Context"
### nimbus.* configs are for the master
nimbus.host: "localhost"
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/project.clj
----------------------------------------------------------------------
diff --git a/storm-core/project.clj b/storm-core/project.clj
index 0eaa6a3..1b7226a 100644
--- a/storm-core/project.clj
+++ b/storm-core/project.clj
@@ -10,7 +10,6 @@
[clj-time "0.4.1"]
[com.netflix.curator/curator-framework "1.0.1"
:exclusions [log4j/log4j]]
- [backtype/jzmq "2.1.0"]
[com.googlecode.json-simple/json-simple "1.1"]
[compojure "1.1.3"]
[hiccup "0.3.6"]
@@ -27,6 +26,7 @@
[com.google.guava/guava "13.0"]
[ch.qos.logback/logback-classic "1.0.6"]
[org.slf4j/log4j-over-slf4j "1.6.6"]
+ [io.netty/netty "3.6.3.Final"]
]
:source-paths ["src/clj"]
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/clj/backtype/storm/messaging/zmq.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/zmq.clj b/storm-core/src/clj/backtype/storm/messaging/zmq.clj
deleted file mode 100644
index 23e263e..0000000
--- a/storm-core/src/clj/backtype/storm/messaging/zmq.clj
+++ /dev/null
@@ -1,93 +0,0 @@
-(ns backtype.storm.messaging.zmq
- (:refer-clojure :exclude [send])
- (:use [backtype.storm config log])
- (:import [backtype.storm.messaging IContext IConnection TaskMessage])
- (:import [java.nio ByteBuffer])
- (:import [org.zeromq ZMQ])
- (:import [java.util Map])
- (:require [zilch.mq :as mq])
- (:gen-class
- :methods [^{:static true} [makeContext [java.util.Map] backtype.storm.messaging.IContext]]))
-
-(defn mk-packet [task ^bytes message]
- (let [bb (ByteBuffer/allocate (+ 2 (count message)))]
- (.putShort bb (short task))
- (.put bb message)
- (.array bb)
- ))
-
-(defn parse-packet [^bytes packet]
- (let [bb (ByteBuffer/wrap packet)
- port (.getShort bb)
- msg (byte-array (- (count packet) 2))]
- (.get bb msg)
- (TaskMessage. (int port) msg)
- ))
-
-(defn get-bind-zmq-url [local? port]
- (if local?
- (str "ipc://" port ".ipc")
- (str "tcp://*:" port)))
-
-(defn get-connect-zmq-url [local? host port]
- (if local?
- (str "ipc://" port ".ipc")
- (str "tcp://" host ":" port)))
-
-
-(defprotocol ZMQContextQuery
- (zmq-context [this]))
-
-(deftype ZMQConnection [socket]
- IConnection
- (^TaskMessage recv [this ^int flags]
- (require 'backtype.storm.messaging.zmq)
- (if-let [packet (mq/recv socket flags)]
- (parse-packet packet)))
- (^void send [this ^int taskId ^bytes payload]
- (require 'backtype.storm.messaging.zmq)
- (mq/send socket (mk-packet taskId payload) ZMQ/NOBLOCK)) ;; TODO: how to do backpressure if doing noblock?... need to only unblock if the target disappears
- (^void close [this]
- (.close socket)))
-
-(defn mk-connection [socket]
- (ZMQConnection. socket))
-
-(deftype ZMQContext [^{:unsynchronized-mutable true} context
- ^{:unsynchronized-mutable true} linger-ms
- ^{:unsynchronized-mutable true} hwm
- ^{:unsynchronized-mutable true} local?]
- IContext
- (^void prepare [this ^Map storm-conf]
- (let [num-threads (.get storm-conf ZMQ-THREADS)]
- (set! context (mq/context num-threads))
- (set! linger-ms (.get storm-conf ZMQ-LINGER-MILLIS))
- (set! hwm (.get storm-conf ZMQ-HWM))
- (set! local? (= (.get storm-conf STORM-CLUSTER-MODE) "local"))))
- (^IConnection bind [this ^String storm-id ^int port]
- (require 'backtype.storm.messaging.zmq)
- (-> context
- (mq/socket mq/pull)
- (mq/set-hwm hwm)
- (mq/bind (get-bind-zmq-url local? port))
- mk-connection
- ))
- (^IConnection connect [this ^String storm-id ^String host ^int port]
- (require 'backtype.storm.messaging.zmq)
- (-> context
- (mq/socket mq/push)
- (mq/set-hwm hwm)
- (mq/set-linger linger-ms)
- (mq/connect (get-connect-zmq-url local? host port))
- mk-connection))
- (^void term [this]
- (.term context))
-
- ZMQContextQuery
- (zmq-context [this]
- context))
-
-(defn -makeContext [^Map storm-conf]
- (let [context (ZMQContext. nil 0 0 true)]
- (.prepare context storm-conf)
- context))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
index 700dce6..a17743a 100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@ -96,7 +96,7 @@
;; local dir is always overridden in maps
;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
;; if need to customize amt of ports more, can use add-supervisor calls afterwards
-(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1]
+(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024]
(let [zk-tmp (local-temp-path)
[zk-port zk-handle] (zk/mk-inprocess-zookeeper zk-tmp)
daemon-conf (merge (read-storm-config)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/clj/zilch/mq.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/zilch/mq.clj b/storm-core/src/clj/zilch/mq.clj
deleted file mode 100644
index 27c2094..0000000
--- a/storm-core/src/clj/zilch/mq.clj
+++ /dev/null
@@ -1,104 +0,0 @@
-;; Copyright 2011 Tim Dysinger
-
-;; Licensed under the Apache License, Version 2.0 (the "License");
-;; you may not use this file except in compliance with the License.
-;; You may obtain a copy of the License at
-
-;; http://www.apache.org/licenses/LICENSE-2.0
-
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns zilch.mq
- (:refer-clojure :exclude [send])
- )
-
-(defmacro zeromq-imports []
- '(do
- (import '[org.zeromq ZMQ ZMQ$Context ZMQ$Socket])
- ))
-
-(zeromq-imports)
-
-(defn ^ZMQ$Context context [threads]
- (ZMQ/context threads))
-
-(defmacro with-context
- [id threads & body]
- `(let [~id (context ~threads)]
- (try ~@body
- (finally (.term ~id)))))
-
-(def sndmore ZMQ/SNDMORE)
-
-(def req ZMQ/REQ)
-(def rep ZMQ/REP)
-(def xreq ZMQ/XREQ)
-(def xrep ZMQ/XREP)
-(def pub ZMQ/PUB)
-(def sub ZMQ/SUB)
-(def pair ZMQ/PAIR)
-(def push ZMQ/PUSH)
-(def pull ZMQ/PULL)
-
-(defn ^bytes barr [& arr]
- (byte-array (map byte arr)))
-
-(defn ^ZMQ$Socket socket
- [^ZMQ$Context context type]
- (.socket context type))
-
-(defn set-linger
- [^ZMQ$Socket socket linger-ms]
- (doto socket
- (.setLinger (long linger-ms))))
-
-(defn set-hwm
- [^ZMQ$Socket socket hwm]
- (if hwm
- (doto socket
- (.setHWM (long hwm)))
- socket
- ))
-
-(defn bind
- [^ZMQ$Socket socket url]
- (doto socket
- (.bind url)))
-
-(defn connect
- [^ZMQ$Socket socket url]
- (doto socket
- (.connect url)))
-
-(defn subscribe
- ([^ZMQ$Socket socket ^bytes topic]
- (doto socket
- (.subscribe topic)))
- ([^ZMQ$Socket socket]
- (subscribe socket (byte-array []))))
-
-(defn unsubscribe
- ([^ZMQ$Socket socket ^bytes topic]
- (doto socket
- (.unsubscribe (.getBytes topic))))
- ([^ZMQ$Socket socket]
- (unsubscribe socket "")))
-
-(defn send
- ([^ZMQ$Socket socket ^bytes message flags]
- (.send socket message flags))
- ([^ZMQ$Socket socket ^bytes message]
- (send socket message ZMQ/NOBLOCK)))
-
-(defn recv-more? [^ZMQ$Socket socket]
- (.hasReceiveMore socket))
-
-(defn recv
- ([^ZMQ$Socket socket flags]
- (.recv socket flags))
- ([^ZMQ$Socket socket]
- (recv socket 0)))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
new file mode 100644
index 0000000..c2b391a
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -0,0 +1,204 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.Config;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.utils.Utils;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+class Client implements IConnection {
+ private static final Logger LOG = LoggerFactory.getLogger(Client.class);
+ private final int max_retries;
+ private final int base_sleep_ms;
+ private final int max_sleep_ms;
+ private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
+ private AtomicReference<Channel> channelRef;
+ private final ClientBootstrap bootstrap;
+ private InetSocketAddress remote_addr;
+ private AtomicInteger retries;
+ private final Random random = new Random();
+ private final ChannelFactory factory;
+ private final int buffer_size;
+ private final AtomicBoolean being_closed;
+
+ @SuppressWarnings("rawtypes")
+ Client(Map storm_conf, String host, int port) {
+ message_queue = new LinkedBlockingQueue<Object>();
+ retries = new AtomicInteger(0);
+ channelRef = new AtomicReference<Channel>(null);
+ being_closed = new AtomicBoolean(false);
+
+ // Configure
+ buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+ max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
+ base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
+ max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
+ int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
+
+ if (maxWorkers > 0) {
+ factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
+ } else {
+ factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+ }
+ bootstrap = new ClientBootstrap(factory);
+ bootstrap.setOption("tcpNoDelay", true);
+ bootstrap.setOption("sendBufferSize", buffer_size);
+ bootstrap.setOption("keepAlive", true);
+
+ // Set up the pipeline factory.
+ bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
+
+ // Start the connection attempt.
+ remote_addr = new InetSocketAddress(host, port);
+ bootstrap.connect(remote_addr);
+ }
+
+ /**
+ * We will retry connection with exponential back-off policy
+ */
+ void reconnect() {
+ try {
+ int tried_count = retries.incrementAndGet();
+ if (tried_count <= max_retries) {
+ Thread.sleep(getSleepTimeMs());
+ LOG.info("Reconnect ... [{}]", tried_count);
+ bootstrap.connect(remote_addr);
+ LOG.debug("connection started...");
+ } else {
+ LOG.warn("Remote address is not reachable. We will close this client.");
+ close();
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("connection failed", e);
+ }
+ }
+
+ /**
+ * # of milliseconds to wait per exponential back-off policy
+ */
+ private int getSleepTimeMs()
+ {
+ int backoff = 1 << retries.get();
+ int sleepMs = base_sleep_ms * Math.max(1, random.nextInt(backoff));
+ if ( sleepMs > max_sleep_ms )
+ sleepMs = max_sleep_ms;
+ return sleepMs;
+ }
+
+ /**
+ * Enqueue a task message to be sent to server
+ */
+ public void send(int task, byte[] message) {
+ //throw exception if the client is being closed
+ if (being_closed.get()) {
+ throw new RuntimeException("Client is being closed, and does not take requests any more");
+ }
+
+ try {
+ message_queue.put(new TaskMessage(task, message));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Take all enqueued messages from queue
+ * @return
+ * @throws InterruptedException
+ */
+ MessageBatch takeMessages() throws InterruptedException {
+ //1st message
+ MessageBatch batch = new MessageBatch(buffer_size);
+ Object msg = message_queue.take();
+ batch.add(msg);
+
+ //we will discard any message after CLOSE
+ if (msg==ControlMessage.CLOSE_MESSAGE)
+ return batch;
+
+ while (!batch.isFull()) {
+ //peek the next message
+ msg = message_queue.peek();
+ //no more messages
+ if (msg == null) break;
+
+ //we will discard any message after CLOSE
+ if (msg==ControlMessage.CLOSE_MESSAGE) {
+ message_queue.take();
+ batch.add(msg);
+ break;
+ }
+
+ //try to add this msg into batch
+ if (!batch.tryAdd((TaskMessage) msg))
+ break;
+
+ //remove this message
+ message_queue.take();
+ }
+
+ return batch;
+ }
+
+ /**
+ * gracefully close this client.
+ *
+ * We will send all existing requests, and then invoke close_n_release() method
+ */
+ public synchronized void close() {
+ if (!being_closed.get()) {
+ //enqueue a CLOSE message so that shutdown() will be invoked
+ try {
+ message_queue.put(ControlMessage.CLOSE_MESSAGE);
+ being_closed.set(true);
+ } catch (InterruptedException e) {
+ close_n_release();
+ }
+ }
+ }
+
+ /**
+ * close_n_release() is invoked after all messages have been sent.
+ */
+ void close_n_release() {
+ if (channelRef.get() != null)
+ channelRef.get().close().awaitUninterruptibly();
+
+ //we need to release resources
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ factory.releaseExternalResources();
+ }}).start();
+ }
+
+ public TaskMessage recv(int flags) {
+ throw new RuntimeException("Client connection should not receive any messages");
+ }
+
+ void setChannel(Channel channel) {
+ channelRef.set(channel);
+ //reset retries
+ if (channel != null)
+ retries.set(0);
+ }
+
+}
+
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
new file mode 100644
index 0000000..018e0f9
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -0,0 +1,50 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.IContext;
+
+import java.util.Map;
+import java.util.Vector;
+
+public class Context implements IContext {
+ @SuppressWarnings("rawtypes")
+ private Map storm_conf;
+ private volatile Vector<IConnection> connections;
+
+ /**
+ * initialization per Storm configuration
+ */
+ @SuppressWarnings("rawtypes")
+ public void prepare(Map storm_conf) {
+ this.storm_conf = storm_conf;
+ connections = new Vector<IConnection>();
+ }
+
+ /**
+ * establish a server with a binding port
+ */
+ public IConnection bind(String storm_id, int port) {
+ IConnection server = new Server(storm_conf, port);
+ connections.add(server);
+ return server;
+ }
+
+ /**
+ * establish a connection to a remote server
+ */
+ public IConnection connect(String storm_id, String host, int port) {
+ IConnection client = new Client(storm_conf, host, port);
+ connections.add(client);
+ return client;
+ }
+
+ /**
+ * terminate this context
+ */
+ public void term() {
+ for (IConnection conn : connections) {
+ conn.close();
+ }
+ connections = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
new file mode 100644
index 0000000..4cc2040
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
@@ -0,0 +1,50 @@
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+enum ControlMessage {
+ CLOSE_MESSAGE((short)-100),
+ EOB_MESSAGE((short)-201),
+ OK_RESPONSE((short)-200),
+ FAILURE_RESPONSE((short)-400);
+
+ private short code;
+
+ //private constructor
+ private ControlMessage(short code) {
+ this.code = code;
+ }
+
+ /**
+ * Return a control message per an encoded status code
+ * @param encoded
+ * @return
+ */
+ static ControlMessage mkMessage(short encoded) {
+ for(ControlMessage cm: ControlMessage.values()) {
+ if(encoded == cm.code) return cm;
+ }
+ return null;
+ }
+
+ int encodeLength() {
+ return 2; //short
+ }
+
+ /**
+ * encode the current Control Message into a channel buffer
+ * @throws Exception
+ */
+ ChannelBuffer buffer() throws Exception {
+ ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));
+ write(bout);
+ bout.close();
+ return bout.buffer();
+ }
+
+ void write(ChannelBufferOutputStream bout) throws Exception {
+ bout.writeShort(code);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
new file mode 100644
index 0000000..a9d46a2
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@ -0,0 +1,151 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.util.ArrayList;
+
+class MessageBatch {
+ private int buffer_size;
+ private ArrayList<Object> msgs;
+ private int encoded_length;
+
+ MessageBatch(int buffer_size) {
+ this.buffer_size = buffer_size;
+ msgs = new ArrayList<Object>();
+ encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
+ }
+
+ void add(Object obj) {
+ if (obj == null)
+ throw new RuntimeException("null object forbidded in message batch");
+
+ if (obj instanceof TaskMessage) {
+ TaskMessage msg = (TaskMessage)obj;
+ msgs.add(msg);
+ encoded_length += msgEncodeLength(msg);
+ return;
+ }
+
+ if (obj instanceof ControlMessage) {
+ ControlMessage msg = (ControlMessage)obj;
+ msgs.add(msg);
+ encoded_length += msg.encodeLength();
+ return;
+ }
+
+ throw new RuntimeException("Unsuppoted object type "+obj.getClass().getName());
+ }
+
+ void remove(Object obj) {
+ if (obj == null) return;
+
+ if (obj instanceof TaskMessage) {
+ TaskMessage msg = (TaskMessage)obj;
+ msgs.remove(msg);
+ encoded_length -= msgEncodeLength(msg);
+ return;
+ }
+
+ if (obj instanceof ControlMessage) {
+ ControlMessage msg = (ControlMessage)obj;
+ msgs.remove(msg);
+ encoded_length -= msg.encodeLength();
+ return;
+ }
+ }
+
+ Object get(int index) {
+ return msgs.get(index);
+ }
+
+ /**
+ * try to add a TaskMessage to a batch
+ * @param taskMsg
+ * @return false if the msg could not be added due to buffer size limit; true otherwise
+ */
+ boolean tryAdd(TaskMessage taskMsg) {
+ if ((encoded_length + msgEncodeLength(taskMsg)) > buffer_size)
+ return false;
+ add(taskMsg);
+ return true;
+ }
+
+ private int msgEncodeLength(TaskMessage taskMsg) {
+ if (taskMsg == null) return 0;
+
+ int size = 6; //INT + SHORT
+ if (taskMsg.message() != null)
+ size += taskMsg.message().length;
+ return size;
+ }
+
+ /**
+ * Has this batch used up allowed buffer size
+ * @return
+ */
+ boolean isFull() {
+ return encoded_length >= buffer_size;
+ }
+
+ /**
+ * true if this batch doesn't have any messages
+ * @return
+ */
+ boolean isEmpty() {
+ return msgs.isEmpty();
+ }
+
+ /**
+ * # of msgs in this batch
+ * @return
+ */
+ int size() {
+ return msgs.size();
+ }
+
+ /**
+ * create a buffer containing the encoding of this batch
+ */
+ ChannelBuffer buffer() throws Exception {
+ ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
+
+ for (Object msg : msgs)
+ if (msg instanceof TaskMessage)
+ writeTaskMessage(bout, (TaskMessage)msg);
+ else
+ ((ControlMessage)msg).write(bout);
+
+ //add a END_OF_BATCH indicator
+ ControlMessage.EOB_MESSAGE.write(bout);
+
+ bout.close();
+
+ return bout.buffer();
+ }
+
+ /**
+ * write a TaskMessage into a stream
+ *
+ * Each TaskMessage is encoded as:
+ * task ... short(2)
+ * len ... int(4)
+ * payload ... byte[] *
+ */
+ private void writeTaskMessage(ChannelBufferOutputStream bout, TaskMessage message) throws Exception {
+ int payload_len = 0;
+ if (message.message() != null)
+ payload_len = message.message().length;
+
+ int task_id = message.task();
+ if (task_id > Short.MAX_VALUE)
+ throw new RuntimeException("Task ID should not exceed "+Short.MAX_VALUE);
+
+ bout.writeShort((short)task_id);
+ bout.writeInt(payload_len);
+ if (payload_len >0)
+ bout.write(message.message());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
new file mode 100644
index 0000000..76776a9
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
@@ -0,0 +1,68 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+public class MessageDecoder extends FrameDecoder {
+ /*
+ * Each ControlMessage is encoded as:
+ * code (<0) ... short(2)
+ * Each TaskMessage is encoded as:
+ * task (>=0) ... short(2)
+ * len ... int(4)
+ * payload ... byte[] *
+ */
+ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
+ // Make sure that we have received at least a short
+ if (buf.readableBytes() < 2) {
+ //need more data
+ return null;
+ }
+
+ // Mark the current buffer position before reading task/len field
+ // because the whole frame might not be in the buffer yet.
+ // We will reset the buffer position to the marked position if
+ // there's not enough bytes in the buffer.
+ buf.markReaderIndex();
+
+ //read the short field
+ short code = buf.readShort();
+
+ //case 1: Control message
+ ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
+ if (ctrl_msg != null) return ctrl_msg;
+
+ //case 2: task Message
+ short task = code;
+
+ // Make sure that we have received at least an integer (length)
+ if (buf.readableBytes() < 4) {
+ //need more data
+ buf.resetReaderIndex();
+ return null;
+ }
+
+ // Read the length field.
+ int length = buf.readInt();
+ if (length<=0) {
+ return new TaskMessage(task, null);
+ }
+
+ // Make sure if there's enough bytes in the buffer.
+ if (buf.readableBytes() < length) {
+ // The whole bytes were not received yet - return null.
+ buf.resetReaderIndex();
+ return null;
+ }
+
+ // There's enough bytes in the buffer. Read it.
+ ChannelBuffer payload = buf.readBytes(length);
+
+ // Successfully decoded a frame.
+ // Return a TaskMessage object
+ return new TaskMessage(task,payload.array());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
new file mode 100644
index 0000000..c0ac8f1
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
@@ -0,0 +1,22 @@
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+public class MessageEncoder extends OneToOneEncoder {
+ @Override
+ protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
+ if (obj instanceof ControlMessage) {
+ return ((ControlMessage)obj).buffer();
+ }
+
+ if (obj instanceof MessageBatch) {
+ return ((MessageBatch)obj).buffer();
+ }
+
+ throw new RuntimeException("Unsupported encoding of object of class "+obj.getClass().getName());
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
new file mode 100644
index 0000000..bf6825c
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -0,0 +1,119 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.Config;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.utils.Utils;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+class Server implements IConnection {
+ private static final Logger LOG = LoggerFactory.getLogger(Server.class);
+ @SuppressWarnings("rawtypes")
+ Map storm_conf;
+ int port;
+ private LinkedBlockingQueue<TaskMessage> message_queue;
+ volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
+ final ChannelFactory factory;
+ final ServerBootstrap bootstrap;
+
+ @SuppressWarnings("rawtypes")
+ Server(Map storm_conf, int port) {
+ this.storm_conf = storm_conf;
+ this.port = port;
+ message_queue = new LinkedBlockingQueue<TaskMessage>();
+
+ // Configure the server.
+ int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+ int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS));
+
+ if (maxWorkers > 0) {
+ factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
+ } else {
+ factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+ }
+ bootstrap = new ServerBootstrap(factory);
+ bootstrap.setOption("child.tcpNoDelay", true);
+ bootstrap.setOption("child.receiveBufferSize", buffer_size);
+ bootstrap.setOption("child.keepAlive", true);
+
+ // Set up the pipeline factory.
+ bootstrap.setPipelineFactory(new StormServerPipelineFactory(this));
+
+ // Bind and start to accept incoming connections.
+ Channel channel = bootstrap.bind(new InetSocketAddress(port));
+ allChannels.add(channel);
+ }
+
+ /**
+ * enqueue a received message
+ * @param message
+ * @throws InterruptedException
+ */
+ protected void enqueue(TaskMessage message) throws InterruptedException {
+ message_queue.put(message);
+ LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length);
+ }
+
+ /**
+ * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
+ */
+ public TaskMessage recv(int flags) {
+ if ((flags & 0x01) == 0x01) {
+ //non-blocking
+ return message_queue.poll();
+ } else {
+ try {
+ TaskMessage request = message_queue.take();
+ LOG.debug("request to be processed: {}", request);
+ return request;
+ } catch (InterruptedException e) {
+ LOG.info("exception within msg receiving", e);
+ return null;
+ }
+ }
+ }
+
+ /**
+ * register a newly created channel
+ * @param channel
+ */
+ protected void addChannel(Channel channel) {
+ allChannels.add(channel);
+ }
+
+ /**
+ * close a channel
+ * @param channel
+ */
+ protected void closeChannel(Channel channel) {
+ channel.close().awaitUninterruptibly();
+ allChannels.remove(channel);
+ }
+
+ /**
+ * close all channels, and release resources
+ */
+ public synchronized void close() {
+ if (allChannels != null) {
+ allChannels.close().awaitUninterruptibly();
+ factory.releaseExternalResources();
+ allChannels = null;
+ }
+ }
+
+ public void send(int task, byte[] message) {
+ throw new RuntimeException("Server connection should not send any messages");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
new file mode 100644
index 0000000..6fbfb1c
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
@@ -0,0 +1,104 @@
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class StormClientHandler extends SimpleChannelUpstreamHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
+ private Client client;
+ private AtomicBoolean being_closed;
+ long start_time;
+
+ StormClientHandler(Client client) {
+ this.client = client;
+ being_closed = new AtomicBoolean(false);
+ start_time = System.currentTimeMillis();
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) {
+ //register the newly established channel
+ Channel channel = event.getChannel();
+ client.setChannel(channel);
+ LOG.debug("connection established to a remote host");
+
+ //send next request
+ try {
+ sendRequests(channel, client.takeMessages());
+ } catch (InterruptedException e) {
+ channel.close();
+ }
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
+ LOG.debug("send/recv time (ms): {}", (System.currentTimeMillis() - start_time));
+
+ //examine the response message from server
+ ControlMessage msg = (ControlMessage)event.getMessage();
+ if (msg==ControlMessage.FAILURE_RESPONSE)
+ LOG.info("failure response:{}", msg);
+
+ //send next request
+ Channel channel = event.getChannel();
+ try {
+ sendRequests(channel, client.takeMessages());
+ } catch (InterruptedException e) {
+ channel.close();
+ }
+ }
+
+ /**
+ * Retrieve a request from message queue, and send to server
+ * @param channel
+ */
+ private void sendRequests(Channel channel, final MessageBatch requests) {
+ if (requests==null || requests.size()==0 || being_closed.get()) return;
+
+ //if task==CLOSE_MESSAGE for our last request, the channel is to be closed
+ Object last_msg = requests.get(requests.size()-1);
+ if (last_msg==ControlMessage.CLOSE_MESSAGE) {
+ being_closed.set(true);
+ requests.remove(last_msg);
+ }
+
+ //we may don't need do anything if no requests found
+ if (requests.isEmpty()) {
+ if (being_closed.get())
+ client.close_n_release();
+ return;
+ }
+
+ //write request into socket channel
+ ChannelFuture future = channel.write(requests);
+ future.addListener(new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture future)
+ throws Exception {
+ if (!future.isSuccess()) {
+ LOG.info("failed to send requests:", future.getCause());
+ future.getChannel().close();
+ } else {
+ LOG.debug("{} request(s) sent", requests.size());
+ }
+ if (being_closed.get())
+ client.close_n_release();
+ }
+ });
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
+ Throwable cause = event.getCause();
+ if (!(cause instanceof ConnectException)) {
+ LOG.info("Connection failed:", cause);
+ }
+ if (!being_closed.get()) {
+ client.setChannel(null);
+ client.reconnect();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
new file mode 100644
index 0000000..91c513a
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@ -0,0 +1,27 @@
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+
+class StormClientPipelineFactory implements ChannelPipelineFactory {
+ private Client client;
+
+ StormClientPipelineFactory(Client client) {
+ this.client = client;
+ }
+
+ public ChannelPipeline getPipeline() throws Exception {
+ // Create a default pipeline implementation.
+ ChannelPipeline pipeline = Channels.pipeline();
+
+ // Decoder
+ pipeline.addLast("decoder", new MessageDecoder());
+ // Encoder
+ pipeline.addLast("encoder", new MessageEncoder());
+ // business logic.
+ pipeline.addLast("handler", new StormClientHandler(client));
+
+ return pipeline;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
new file mode 100644
index 0000000..9a5aaed
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
@@ -0,0 +1,53 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+import org.jboss.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+class StormServerHandler extends SimpleChannelUpstreamHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
+ Server server;
+ private AtomicInteger failure_count;
+
+ StormServerHandler(Server server) {
+ this.server = server;
+ failure_count = new AtomicInteger(0);
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ server.addChannel(e.getChannel());
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ Object msg = e.getMessage();
+ if (msg == null) return;
+
+ //end of batch?
+ if (msg==ControlMessage.EOB_MESSAGE) {
+ Channel channel = ctx.getChannel();
+ LOG.debug("Send back response ...");
+ if (failure_count.get()==0)
+ channel.write(ControlMessage.OK_RESPONSE);
+ else channel.write(ControlMessage.FAILURE_RESPONSE);
+ return;
+ }
+
+ //enqueue the received message for processing
+ try {
+ server.enqueue((TaskMessage)msg);
+ } catch (InterruptedException e1) {
+ LOG.info("failed to enqueue a request message", e);
+ failure_count.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ server.closeChannel(e.getChannel());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
new file mode 100644
index 0000000..56b0834
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
@@ -0,0 +1,28 @@
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+
+
+class StormServerPipelineFactory implements ChannelPipelineFactory {
+ private Server server;
+
+ StormServerPipelineFactory(Server server) {
+ this.server = server;
+ }
+
+ public ChannelPipeline getPipeline() throws Exception {
+ // Create a default pipeline implementation.
+ ChannelPipeline pipeline = Channels.pipeline();
+
+ // Decoder
+ pipeline.addLast("decoder", new MessageDecoder());
+ // Encoder
+ pipeline.addLast("encoder", new MessageEncoder());
+ // business logic.
+ pipeline.addLast("handler", new StormServerHandler(server));
+
+ return pipeline;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
new file mode 100644
index 0000000..eefcb48
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
@@ -0,0 +1,44 @@
+(ns backtype.storm.messaging.netty-integration-test
+ (:use [clojure test])
+ (:import [backtype.storm.messaging TransportFactory])
+ (:import [backtype.storm.testing TestWordSpout TestGlobalCount])
+ (:use [backtype.storm bootstrap testing util]))
+
+(bootstrap)
+
+(deftest test-integration
+ (with-simulated-time-local-cluster [cluster :supervisors 4 :supervisor-slot-port-min 6710
+ :daemon-conf {STORM-LOCAL-MODE-ZMQ true
+ STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
+ STORM-MESSAGING-NETTY-MAX-RETRIES 10
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+ STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+ STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+ }]
+ (let [topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
+ {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
+ :parallelism-hint 6)})
+ results (complete-topology cluster
+ topology
+ ;; important for test that
+ ;; #tuples = multiple of 4 and 6
+ :storm-conf {TOPOLOGY-WORKERS 3}
+ :mock-sources {"1" [["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ]}
+ )]
+ (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
+ (read-tuples results "2"))))))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
new file mode 100644
index 0000000..12ebe5d
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -0,0 +1,97 @@
+(ns backtype.storm.messaging.netty-unit-test
+ (:use [clojure test])
+ (:import [backtype.storm.messaging TransportFactory])
+ (:use [backtype.storm bootstrap testing util]))
+
+(bootstrap)
+
+(def port 6700)
+(def task 1)
+
+(deftest test-basic
+ (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
+ storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
+ STORM-MESSAGING-NETTY-MAX-RETRIES 10
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+ STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+ STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+ }
+ context (TransportFactory/makeContext storm-conf)
+ server (.bind context nil port)
+ client (.connect context nil "localhost" port)
+ _ (.send client task (.getBytes req_msg))
+ resp (.recv server 0)]
+ (is (= task (.task resp)))
+ (is (= req_msg (String. (.message resp))))
+ (.close client)
+ (.close server)
+ (.term context)))
+
+(deftest test-large-msg
+ (let [req_msg (apply str (repeat 2048000 'c'))
+ storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
+ STORM-MESSAGING-NETTY-MAX-RETRIES 10
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+ STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+ STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+ }
+ context (TransportFactory/makeContext storm-conf)
+ server (.bind context nil port)
+ client (.connect context nil "localhost" port)
+ _ (.send client task (.getBytes req_msg))
+ resp (.recv server 0)]
+ (is (= task (.task resp)))
+ (is (= req_msg (String. (.message resp))))
+ (.close client)
+ (.close server)
+ (.term context)))
+
+(deftest test-server-delayed
+ (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
+ storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
+ STORM-MESSAGING-NETTY-MAX-RETRIES 10
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+ STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+ STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+ }
+ context (TransportFactory/makeContext storm-conf)
+ client (.connect context nil "localhost" port)
+ _ (.send client task (.getBytes req_msg))
+ _ (Thread/sleep 1000)
+ server (.bind context nil port)
+ resp (.recv server 0)]
+ (is (= task (.task resp)))
+ (is (= req_msg (String. (.message resp))))
+ (.close client)
+ (.close server)
+ (.term context)))
+
+(deftest test-batch
+ (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
+ STORM-MESSAGING-NETTY-MAX-RETRIES 10
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+ STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+ STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+ }
+ context (TransportFactory/makeContext storm-conf)
+ server (.bind context nil port)
+ client (.connect context nil "localhost" port)]
+ (doseq [num (range 1 100000)]
+ (let [req_msg (str num)]
+ (.send client task (.getBytes req_msg))))
+ (doseq [num (range 1 100000)]
+ (let [req_msg (str num)
+ resp (.recv server 0)
+ resp_msg (String. (.message resp))]
+ (is (= req_msg resp_msg))))
+ (.close client)
+ (.close server)
+ (.term context)))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/test/clj/backtype/storm/messaging_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging_test.clj b/storm-core/test/clj/backtype/storm/messaging_test.clj
index 6b44ea1..3c61cec 100644
--- a/storm-core/test/clj/backtype/storm/messaging_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging_test.clj
@@ -14,7 +14,7 @@
STORM-LOCAL-MODE-ZMQ
(if transport-on? true false)
STORM-MESSAGING-TRANSPORT
- "backtype.storm.messaging.zmq"}]
+ "backtype.storm.messaging.netty.Context"}]
(let [topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 2)}
{"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/test/clj/zilch/test/mq.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/zilch/test/mq.clj b/storm-core/test/clj/zilch/test/mq.clj
deleted file mode 100644
index 756d29b..0000000
--- a/storm-core/test/clj/zilch/test/mq.clj
+++ /dev/null
@@ -1,86 +0,0 @@
-(ns zilch.test.mq
- (:use clojure.test)
- (:import [java.util Arrays UUID])
- (:require [zilch.mq :as mq]))
-
-(defn uuid [] (str (UUID/randomUUID)))
-
-(defn random-msg []
- (byte-array (map byte (for [i (range (Integer. (int (rand 100))))]
- (Integer. (int (rand 100)))
- ))))
-
-(def url
- (str "inproc://" (uuid))
- ;; (str "ipc://" (uuid))
- ;; (str "tcp://127.0.0.1:" (+ 4000 (Math/round (rand 1000)))))
- )
-
-(deftest zilch
- (testing "zilch"
- (testing "should be able to"
-
- (testing "push / pull"
- (mq/with-context context 2
- (with-open [s0 (-> context
- (mq/socket mq/pull)
- (mq/bind url))
- s1 (-> context
- (mq/socket mq/push)
- (mq/connect url))]
- (let [msg (random-msg)
- push (future (mq/send s1 msg))
- pull (future (mq/recv s0))]
- (is (Arrays/equals msg @pull))))))
-
- (testing "pub / sub"
- (mq/with-context context 2
- (with-open [s0 (-> context
- (mq/socket mq/pub)
- (mq/bind url))
- s1 (-> context
- (mq/socket mq/sub)
- (mq/subscribe)
- (mq/connect url))]
- (let [msg (random-msg)
- pub (future (mq/send s0 msg))
- sub (future (mq/recv s1))]
- (is (Arrays/equals msg @sub))))))
-
- (testing "pair / pair"
- (mq/with-context context 2
- (with-open [s0 (-> context
- (mq/socket mq/pair)
- (mq/bind url))
- s1 (-> context
- (mq/socket mq/pair)
- (mq/connect url))]
- (let [msg0 (random-msg)
- pair0 (future (mq/send s0 msg0)
- (mq/recv s0))
- msg1 (random-msg)
- pair1 (future (mq/send s1 msg1)
- (mq/recv s1))]
- (is (Arrays/equals msg1 @pair0))
- (is (Arrays/equals msg0 @pair1))))))
-
- (testing "req / rep"
- (mq/with-context context 2
- (with-open [s0 (-> context
- (mq/socket mq/rep)
- (mq/bind url))
- s1 (-> context
- (mq/socket mq/req)
- (mq/connect url))]
- (let [msg (random-msg)
- req (future (mq/send s1 msg)
- (mq/recv s1))
- rep (future (mq/recv s0)
- (mq/send s0 msg))]
- (is (Arrays/equals msg @req))))))
-
- (testing "req / xrep")
-
- (testing "xreq / rep")
-
- (testing "xreq / xrep"))))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/project.clj
----------------------------------------------------------------------
diff --git a/storm-netty/project.clj b/storm-netty/project.clj
deleted file mode 100644
index 24905bf..0000000
--- a/storm-netty/project.clj
+++ /dev/null
@@ -1,13 +0,0 @@
-(def ROOT-DIR (subs *file* 0 (- (count *file*) (count "project.clj"))))
-(def VERSION (-> ROOT-DIR (str "/../VERSION") slurp (.trim)))
-
-(eval `(defproject storm/storm-netty ~VERSION
- :dependencies [[storm/storm-core ~VERSION]
- [io.netty/netty "3.6.3.Final"]]
- :java-source-paths ["src/jvm"]
- :test-paths ["test/clj"]
- :profiles {:release {}}
- :jvm-opts ["-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib"]
- :target-path "target"
- :javac-options ["-target" "1.6" "-source" "1.6"]
- :aot :all))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
deleted file mode 100644
index 91e4bd4..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
+++ /dev/null
@@ -1,205 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.TaskMessage;
-import backtype.storm.utils.Utils;
-
-class Client implements IConnection {
- private static final Logger LOG = LoggerFactory.getLogger(Client.class);
- private final int max_retries;
- private final int base_sleep_ms;
- private final int max_sleep_ms;
- private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
- private AtomicReference<Channel> channelRef;
- private final ClientBootstrap bootstrap;
- private InetSocketAddress remote_addr;
- private AtomicInteger retries;
- private final Random random = new Random();
- private final ChannelFactory factory;
- private final int buffer_size;
- private final AtomicBoolean being_closed;
-
- @SuppressWarnings("rawtypes")
- Client(Map storm_conf, String host, int port) {
- message_queue = new LinkedBlockingQueue<Object>();
- retries = new AtomicInteger(0);
- channelRef = new AtomicReference<Channel>(null);
- being_closed = new AtomicBoolean(false);
-
- // Configure
- buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
- max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
- base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
- max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
- int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
-
- if (maxWorkers > 0) {
- factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
- } else {
- factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
- }
- bootstrap = new ClientBootstrap(factory);
- bootstrap.setOption("tcpNoDelay", true);
- bootstrap.setOption("sendBufferSize", buffer_size);
- bootstrap.setOption("keepAlive", true);
-
- // Set up the pipeline factory.
- bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
-
- // Start the connection attempt.
- remote_addr = new InetSocketAddress(host, port);
- bootstrap.connect(remote_addr);
- }
-
- /**
- * We will retry connection with exponential back-off policy
- */
- void reconnect() {
- try {
- int tried_count = retries.incrementAndGet();
- if (tried_count <= max_retries) {
- Thread.sleep(getSleepTimeMs());
- LOG.info("Reconnect ... [{}]", tried_count);
- bootstrap.connect(remote_addr);
- LOG.debug("connection started...");
- } else {
- LOG.warn("Remote address is not reachable. We will close this client.");
- close();
- }
- } catch (InterruptedException e) {
- LOG.warn("connection failed", e);
- }
- }
-
- /**
- * # of milliseconds to wait per exponential back-off policy
- */
- private int getSleepTimeMs()
- {
- int backoff = 1 << retries.get();
- int sleepMs = base_sleep_ms * Math.max(1, random.nextInt(backoff));
- if ( sleepMs > max_sleep_ms )
- sleepMs = max_sleep_ms;
- return sleepMs;
- }
-
- /**
- * Enqueue a task message to be sent to server
- */
- public void send(int task, byte[] message) {
- //throw exception if the client is being closed
- if (being_closed.get()) {
- throw new RuntimeException("Client is being closed, and does not take requests any more");
- }
-
- try {
- message_queue.put(new TaskMessage(task, message));
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Take all enqueued messages from queue
- * @return
- * @throws InterruptedException
- */
- MessageBatch takeMessages() throws InterruptedException {
- //1st message
- MessageBatch batch = new MessageBatch(buffer_size);
- Object msg = message_queue.take();
- batch.add(msg);
-
- //we will discard any message after CLOSE
- if (msg==ControlMessage.CLOSE_MESSAGE)
- return batch;
-
- while (!batch.isFull()) {
- //peek the next message
- msg = message_queue.peek();
- //no more messages
- if (msg == null) break;
-
- //we will discard any message after CLOSE
- if (msg==ControlMessage.CLOSE_MESSAGE) {
- message_queue.take();
- batch.add(msg);
- break;
- }
-
- //try to add this msg into batch
- if (!batch.tryAdd((TaskMessage) msg))
- break;
-
- //remove this message
- message_queue.take();
- }
-
- return batch;
- }
-
- /**
- * gracefully close this client.
- *
- * We will send all existing requests, and then invoke close_n_release() method
- */
- public synchronized void close() {
- if (!being_closed.get()) {
- //enqueue a CLOSE message so that shutdown() will be invoked
- try {
- message_queue.put(ControlMessage.CLOSE_MESSAGE);
- being_closed.set(true);
- } catch (InterruptedException e) {
- close_n_release();
- }
- }
- }
-
- /**
- * close_n_release() is invoked after all messages have been sent.
- */
- void close_n_release() {
- if (channelRef.get() != null)
- channelRef.get().close().awaitUninterruptibly();
-
- //we need to release resources
- new Thread(new Runnable() {
- @Override
- public void run() {
- factory.releaseExternalResources();
- }}).start();
- }
-
- public TaskMessage recv(int flags) {
- throw new RuntimeException("Client connection should not receive any messages");
- }
-
- void setChannel(Channel channel) {
- channelRef.set(channel);
- //reset retries
- if (channel != null)
- retries.set(0);
- }
-
-}
-
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/Context.java
deleted file mode 100644
index bebd7b6..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/Context.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import java.util.Map;
-import java.util.Vector;
-
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.IContext;
-
-public class Context implements IContext {
- @SuppressWarnings("rawtypes")
- private Map storm_conf;
- private volatile Vector<IConnection> connections;
-
- /**
- * initialization per Storm configuration
- */
- @SuppressWarnings("rawtypes")
- public void prepare(Map storm_conf) {
- this.storm_conf = storm_conf;
- connections = new Vector<IConnection>();
- }
-
- /**
- * establish a server with a binding port
- */
- public IConnection bind(String storm_id, int port) {
- IConnection server = new Server(storm_conf, port);
- connections.add(server);
- return server;
- }
-
- /**
- * establish a connection to a remote server
- */
- public IConnection connect(String storm_id, String host, int port) {
- IConnection client = new Client(storm_conf, host, port);
- connections.add(client);
- return client;
- }
-
- /**
- * terminate this context
- */
- public void term() {
- for (IConnection conn : connections) {
- conn.close();
- }
- connections = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/ControlMessage.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
deleted file mode 100644
index 8b90005..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-
-enum ControlMessage {
- CLOSE_MESSAGE((short)-100),
- EOB_MESSAGE((short)-201),
- OK_RESPONSE((short)-200),
- FAILURE_RESPONSE((short)-400);
-
- private short code;
-
- //private constructor
- private ControlMessage(short code) {
- this.code = code;
- }
-
- /**
- * Return a control message per an encoded status code
- * @param encoded
- * @return
- */
- static ControlMessage mkMessage(short encoded) {
- for(ControlMessage cm: ControlMessage.values()) {
- if(encoded == cm.code) return cm;
- }
- return null;
- }
-
- int encodeLength() {
- return 2; //short
- }
-
- /**
- * encode the current Control Message into a channel buffer
- * @throws Exception
- */
- ChannelBuffer buffer() throws Exception {
- ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));
- write(bout);
- bout.close();
- return bout.buffer();
- }
-
- void write(ChannelBufferOutputStream bout) throws Exception {
- bout.writeShort(code);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
deleted file mode 100644
index a2c52f4..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ /dev/null
@@ -1,153 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import java.util.ArrayList;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-
-import backtype.storm.messaging.TaskMessage;
-
-class MessageBatch {
- private int buffer_size;
- private ArrayList<Object> msgs;
- private int encoded_length;
-
- MessageBatch(int buffer_size) {
- this.buffer_size = buffer_size;
- msgs = new ArrayList<Object>();
- encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
- }
-
- void add(Object obj) {
- if (obj == null)
- throw new RuntimeException("null object forbidded in message batch");
-
- if (obj instanceof TaskMessage) {
- TaskMessage msg = (TaskMessage)obj;
- msgs.add(msg);
- encoded_length += msgEncodeLength(msg);
- return;
- }
-
- if (obj instanceof ControlMessage) {
- ControlMessage msg = (ControlMessage)obj;
- msgs.add(msg);
- encoded_length += msg.encodeLength();
- return;
- }
-
- throw new RuntimeException("Unsuppoted object type "+obj.getClass().getName());
- }
-
- void remove(Object obj) {
- if (obj == null) return;
-
- if (obj instanceof TaskMessage) {
- TaskMessage msg = (TaskMessage)obj;
- msgs.remove(msg);
- encoded_length -= msgEncodeLength(msg);
- return;
- }
-
- if (obj instanceof ControlMessage) {
- ControlMessage msg = (ControlMessage)obj;
- msgs.remove(msg);
- encoded_length -= msg.encodeLength();
- return;
- }
- }
-
- Object get(int index) {
- return msgs.get(index);
- }
-
- /**
- * try to add a TaskMessage to a batch
- * @param taskMsg
- * @return false if the msg could not be added due to buffer size limit; true otherwise
- */
- boolean tryAdd(TaskMessage taskMsg) {
- if ((encoded_length + msgEncodeLength(taskMsg)) > buffer_size)
- return false;
- add(taskMsg);
- return true;
- }
-
- private int msgEncodeLength(TaskMessage taskMsg) {
- if (taskMsg == null) return 0;
-
- int size = 6; //INT + SHORT
- if (taskMsg.message() != null)
- size += taskMsg.message().length;
- return size;
- }
-
- /**
- * Has this batch used up allowed buffer size
- * @return
- */
- boolean isFull() {
- return encoded_length >= buffer_size;
- }
-
- /**
- * true if this batch doesn't have any messages
- * @return
- */
- boolean isEmpty() {
- return msgs.isEmpty();
- }
-
- /**
- * # of msgs in this batch
- * @return
- */
- int size() {
- return msgs.size();
- }
-
- /**
- * create a buffer containing the encoding of this batch
- */
- ChannelBuffer buffer() throws Exception {
- ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
-
- for (Object msg : msgs)
- if (msg instanceof TaskMessage)
- writeTaskMessage(bout, (TaskMessage)msg);
- else
- ((ControlMessage)msg).write(bout);
-
- //add a END_OF_BATCH indicator
- ControlMessage.EOB_MESSAGE.write(bout);
-
- bout.close();
-
- return bout.buffer();
- }
-
- /**
- * write a TaskMessage into a stream
- *
- * Each TaskMessage is encoded as:
- * task ... short(2)
- * len ... int(4)
- * payload ... byte[] *
- */
- private void writeTaskMessage(ChannelBufferOutputStream bout, TaskMessage message) throws Exception {
- int payload_len = 0;
- if (message.message() != null)
- payload_len = message.message().length;
-
- int task_id = message.task();
- if (task_id > Short.MAX_VALUE)
- throw new RuntimeException("Task ID should not exceed "+Short.MAX_VALUE);
-
- bout.writeShort((short)task_id);
- bout.writeInt(payload_len);
- if (payload_len >0)
- bout.write(message.message());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
deleted file mode 100644
index 8190e44..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.frame.FrameDecoder;
-import backtype.storm.messaging.TaskMessage;
-
-public class MessageDecoder extends FrameDecoder {
- /*
- * Each ControlMessage is encoded as:
- * code (<0) ... short(2)
- * Each TaskMessage is encoded as:
- * task (>=0) ... short(2)
- * len ... int(4)
- * payload ... byte[] *
- */
- protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
- // Make sure that we have received at least a short
- if (buf.readableBytes() < 2) {
- //need more data
- return null;
- }
-
- // Mark the current buffer position before reading task/len field
- // because the whole frame might not be in the buffer yet.
- // We will reset the buffer position to the marked position if
- // there's not enough bytes in the buffer.
- buf.markReaderIndex();
-
- //read the short field
- short code = buf.readShort();
-
- //case 1: Control message
- ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
- if (ctrl_msg != null) return ctrl_msg;
-
- //case 2: task Message
- short task = code;
-
- // Make sure that we have received at least an integer (length)
- if (buf.readableBytes() < 4) {
- //need more data
- buf.resetReaderIndex();
- return null;
- }
-
- // Read the length field.
- int length = buf.readInt();
- if (length<=0) {
- return new TaskMessage(task, null);
- }
-
- // Make sure if there's enough bytes in the buffer.
- if (buf.readableBytes() < length) {
- // The whole bytes were not received yet - return null.
- buf.resetReaderIndex();
- return null;
- }
-
- // There's enough bytes in the buffer. Read it.
- ChannelBuffer payload = buf.readBytes(length);
-
- // Successfully decoded a frame.
- // Return a TaskMessage object
- return new TaskMessage(task,payload.array());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
deleted file mode 100644
index c0ac8f1..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
-
-public class MessageEncoder extends OneToOneEncoder {
- @Override
- protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
- if (obj instanceof ControlMessage) {
- return ((ControlMessage)obj).buffer();
- }
-
- if (obj instanceof MessageBatch) {
- return ((MessageBatch)obj).buffer();
- }
-
- throw new RuntimeException("Unsupported encoding of object of class "+obj.getClass().getName());
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/Server.java
deleted file mode 100644
index 4119bbf..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/Server.java
+++ /dev/null
@@ -1,120 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Executors;
-
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.TaskMessage;
-import backtype.storm.utils.Utils;
-
-class Server implements IConnection {
- private static final Logger LOG = LoggerFactory.getLogger(Server.class);
- @SuppressWarnings("rawtypes")
- Map storm_conf;
- int port;
- private LinkedBlockingQueue<TaskMessage> message_queue;
- volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
- final ChannelFactory factory;
- final ServerBootstrap bootstrap;
-
- @SuppressWarnings("rawtypes")
- Server(Map storm_conf, int port) {
- this.storm_conf = storm_conf;
- this.port = port;
- message_queue = new LinkedBlockingQueue<TaskMessage>();
-
- // Configure the server.
- int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
- int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS));
-
- if (maxWorkers > 0) {
- factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
- } else {
- factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
- }
- bootstrap = new ServerBootstrap(factory);
- bootstrap.setOption("child.tcpNoDelay", true);
- bootstrap.setOption("child.receiveBufferSize", buffer_size);
- bootstrap.setOption("child.keepAlive", true);
-
- // Set up the pipeline factory.
- bootstrap.setPipelineFactory(new StormServerPipelineFactory(this));
-
- // Bind and start to accept incoming connections.
- Channel channel = bootstrap.bind(new InetSocketAddress(port));
- allChannels.add(channel);
- }
-
- /**
- * enqueue a received message
- * @param message
- * @throws InterruptedException
- */
- protected void enqueue(TaskMessage message) throws InterruptedException {
- message_queue.put(message);
- LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length);
- }
-
- /**
- * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
- */
- public TaskMessage recv(int flags) {
- if ((flags & 0x01) == 0x01) {
- //non-blocking
- return message_queue.poll();
- } else {
- try {
- TaskMessage request = message_queue.take();
- LOG.debug("request to be processed: {}", request);
- return request;
- } catch (InterruptedException e) {
- LOG.info("exception within msg receiving", e);
- return null;
- }
- }
- }
-
- /**
- * register a newly created channel
- * @param channel
- */
- protected void addChannel(Channel channel) {
- allChannels.add(channel);
- }
-
- /**
- * close a channel
- * @param channel
- */
- protected void closeChannel(Channel channel) {
- channel.close().awaitUninterruptibly();
- allChannels.remove(channel);
- }
-
- /**
- * close all channels, and release resources
- */
- public synchronized void close() {
- if (allChannels != null) {
- allChannels.close().awaitUninterruptibly();
- factory.releaseExternalResources();
- allChannels = null;
- }
- }
-
- public void send(int task, byte[] message) {
- throw new RuntimeException("Server connection should not send any messages");
- }
-}
[3/4] git commit: Merge branch 'master' into netty-default
Posted by pt...@apache.org.
Merge branch 'master' into netty-default
Conflicts:
bin/install_zmq.sh
storm-core/src/clj/backtype/storm/messaging/zmq.clj
storm-core/src/clj/zilch/mq.clj
storm-core/test/clj/zilch/test/mq.clj
storm-netty/project.clj
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/962d5207
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/962d5207
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/962d5207
Branch: refs/heads/master
Commit: 962d520777368709cf30d6214719bfa1b81bbd4c
Parents: b63ed13 7e40f9f
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Dec 16 13:21:19 2013 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Mon Dec 16 13:21:19 2013 -0500
----------------------------------------------------------------------
LICENSE | 285 +++++++++++++++++++
LICENSE.html | 261 -----------------
NOTICE | 12 +-
README.markdown | 23 +-
VERSION | 2 +-
bin/build_modules.sh | 16 ++
bin/build_release.sh | 17 ++
bin/javadoc.sh | 16 ++
bin/storm | 16 ++
bin/to_maven.sh | 17 ++
conf/defaults.yaml | 18 ++
conf/jaas_digest.conf | 18 ++
conf/logback.xml | 17 +-
conf/storm.yaml.example | 16 ++
logback/cluster.xml | 18 +-
project.clj | 15 +
storm-console-logging/project.clj | 15 +
storm-core/project.clj | 21 +-
.../src/clj/backtype/storm/LocalCluster.clj | 15 +
storm-core/src/clj/backtype/storm/LocalDRPC.clj | 15 +
storm-core/src/clj/backtype/storm/bootstrap.clj | 15 +
storm-core/src/clj/backtype/storm/clojure.clj | 15 +
storm-core/src/clj/backtype/storm/cluster.clj | 15 +
.../src/clj/backtype/storm/command/activate.clj | 15 +
.../clj/backtype/storm/command/config_value.clj | 15 +
.../clj/backtype/storm/command/deactivate.clj | 15 +
.../backtype/storm/command/dev_zookeeper.clj | 15 +
.../backtype/storm/command/kill_topology.clj | 15 +
.../src/clj/backtype/storm/command/list.clj | 15 +
.../clj/backtype/storm/command/rebalance.clj | 15 +
.../backtype/storm/command/shell_submission.clj | 15 +
storm-core/src/clj/backtype/storm/config.clj | 15 +
.../src/clj/backtype/storm/daemon/acker.clj | 15 +
.../backtype/storm/daemon/builtin_metrics.clj | 15 +
.../src/clj/backtype/storm/daemon/common.clj | 15 +
.../src/clj/backtype/storm/daemon/drpc.clj | 19 +-
.../src/clj/backtype/storm/daemon/executor.clj | 15 +
.../src/clj/backtype/storm/daemon/logviewer.clj | 15 +
.../src/clj/backtype/storm/daemon/nimbus.clj | 15 +
.../clj/backtype/storm/daemon/supervisor.clj | 15 +
.../src/clj/backtype/storm/daemon/task.clj | 15 +
.../src/clj/backtype/storm/daemon/worker.clj | 15 +
storm-core/src/clj/backtype/storm/disruptor.clj | 15 +
storm-core/src/clj/backtype/storm/event.clj | 15 +
storm-core/src/clj/backtype/storm/log.clj | 15 +
.../src/clj/backtype/storm/messaging/loader.clj | 15 +
.../src/clj/backtype/storm/messaging/local.clj | 15 +
.../src/clj/backtype/storm/metric/testing.clj | 15 +
.../clj/backtype/storm/process_simulator.clj | 15 +
.../storm/scheduler/DefaultScheduler.clj | 15 +
.../backtype/storm/scheduler/EvenScheduler.clj | 15 +
.../storm/scheduler/IsolationScheduler.clj | 15 +
storm-core/src/clj/backtype/storm/stats.clj | 15 +
storm-core/src/clj/backtype/storm/testing.clj | 15 +
storm-core/src/clj/backtype/storm/testing4j.clj | 15 +
storm-core/src/clj/backtype/storm/thrift.clj | 15 +
storm-core/src/clj/backtype/storm/timer.clj | 15 +
storm-core/src/clj/backtype/storm/tuple.clj | 15 +
storm-core/src/clj/backtype/storm/ui/core.clj | 15 +
.../src/clj/backtype/storm/ui/helpers.clj | 15 +
storm-core/src/clj/backtype/storm/util.clj | 15 +
storm-core/src/clj/backtype/storm/zookeeper.clj | 15 +
storm-core/src/clj/storm/trident/testing.clj | 15 +
storm-core/src/dev/resources/tester_bolt.py | 18 ++
storm-core/src/dev/resources/tester_bolt.rb | 18 ++
storm-core/src/dev/resources/tester_spout.py | 18 ++
storm-core/src/dev/resources/tester_spout.rb | 17 ++
storm-core/src/genthrift.sh | 16 ++
storm-core/src/jvm/backtype/storm/Config.java | 17 ++
.../jvm/backtype/storm/ConfigValidation.java | 17 ++
.../src/jvm/backtype/storm/Constants.java | 17 ++
.../src/jvm/backtype/storm/ILocalCluster.java | 17 ++
.../src/jvm/backtype/storm/ILocalDRPC.java | 17 ++
.../src/jvm/backtype/storm/StormSubmitter.java | 17 ++
.../jvm/backtype/storm/clojure/ClojureBolt.java | 17 ++
.../backtype/storm/clojure/ClojureSpout.java | 17 ++
.../backtype/storm/clojure/RichShellBolt.java | 17 ++
.../backtype/storm/clojure/RichShellSpout.java | 17 ++
.../storm/coordination/BatchBoltExecutor.java | 17 ++
.../coordination/BatchOutputCollector.java | 17 ++
.../coordination/BatchOutputCollectorImpl.java | 17 ++
.../coordination/BatchSubtopologyBuilder.java | 17 ++
.../storm/coordination/CoordinatedBolt.java | 17 ++
.../backtype/storm/coordination/IBatchBolt.java | 17 ++
.../jvm/backtype/storm/daemon/Shutdownable.java | 17 ++
.../storm/drpc/DRPCInvocationsClient.java | 17 ++
.../src/jvm/backtype/storm/drpc/DRPCSpout.java | 17 ++
.../src/jvm/backtype/storm/drpc/JoinResult.java | 17 ++
.../jvm/backtype/storm/drpc/KeyedFairBolt.java | 17 ++
.../storm/drpc/LinearDRPCInputDeclarer.java | 17 ++
.../storm/drpc/LinearDRPCTopologyBuilder.java | 17 ++
.../jvm/backtype/storm/drpc/PrepareRequest.java | 17 ++
.../jvm/backtype/storm/drpc/ReturnResults.java | 17 ++
.../storm/generated/AlreadyAliveException.java | 17 ++
.../src/jvm/backtype/storm/generated/Bolt.java | 17 ++
.../jvm/backtype/storm/generated/BoltStats.java | 17 ++
.../storm/generated/ClusterSummary.java | 17 ++
.../storm/generated/ComponentCommon.java | 17 ++
.../storm/generated/ComponentObject.java | 17 ++
.../storm/generated/DRPCExecutionException.java | 17 ++
.../backtype/storm/generated/DRPCRequest.java | 17 ++
.../storm/generated/DistributedRPC.java | 17 ++
.../generated/DistributedRPCInvocations.java | 17 ++
.../jvm/backtype/storm/generated/ErrorInfo.java | 17 ++
.../backtype/storm/generated/ExecutorInfo.java | 17 ++
.../storm/generated/ExecutorSpecificStats.java | 17 ++
.../backtype/storm/generated/ExecutorStats.java | 17 ++
.../storm/generated/ExecutorSummary.java | 17 ++
.../storm/generated/GlobalStreamId.java | 17 ++
.../jvm/backtype/storm/generated/Grouping.java | 17 ++
.../generated/InvalidTopologyException.java | 17 ++
.../backtype/storm/generated/JavaObject.java | 17 ++
.../backtype/storm/generated/JavaObjectArg.java | 17 ++
.../backtype/storm/generated/KillOptions.java | 17 ++
.../jvm/backtype/storm/generated/Nimbus.java | 17 ++
.../storm/generated/NotAliveException.java | 17 ++
.../backtype/storm/generated/NullStruct.java | 17 ++
.../storm/generated/RebalanceOptions.java | 17 ++
.../storm/generated/ShellComponent.java | 17 ++
.../jvm/backtype/storm/generated/SpoutSpec.java | 17 ++
.../backtype/storm/generated/SpoutStats.java | 17 ++
.../storm/generated/StateSpoutSpec.java | 17 ++
.../backtype/storm/generated/StormTopology.java | 17 ++
.../backtype/storm/generated/StreamInfo.java | 17 ++
.../backtype/storm/generated/SubmitOptions.java | 17 ++
.../storm/generated/SupervisorSummary.java | 17 ++
.../backtype/storm/generated/TopologyInfo.java | 17 ++
.../storm/generated/TopologyInitialStatus.java | 17 ++
.../storm/generated/TopologySummary.java | 17 ++
.../storm/grouping/CustomStreamGrouping.java | 17 ++
.../jvm/backtype/storm/hooks/BaseTaskHook.java | 17 ++
.../src/jvm/backtype/storm/hooks/ITaskHook.java | 17 ++
.../backtype/storm/hooks/info/BoltAckInfo.java | 17 ++
.../storm/hooks/info/BoltExecuteInfo.java | 17 ++
.../backtype/storm/hooks/info/BoltFailInfo.java | 17 ++
.../jvm/backtype/storm/hooks/info/EmitInfo.java | 17 ++
.../backtype/storm/hooks/info/SpoutAckInfo.java | 17 ++
.../storm/hooks/info/SpoutFailInfo.java | 17 ++
.../backtype/storm/messaging/IConnection.java | 17 ++
.../jvm/backtype/storm/messaging/IContext.java | 17 ++
.../backtype/storm/messaging/TaskMessage.java | 17 ++
.../storm/messaging/TransportFactory.java | 17 ++
.../backtype/storm/messaging/netty/Client.java | 17 ++
.../backtype/storm/messaging/netty/Context.java | 17 ++
.../storm/messaging/netty/ControlMessage.java | 17 ++
.../storm/messaging/netty/MessageBatch.java | 17 ++
.../storm/messaging/netty/MessageDecoder.java | 17 ++
.../storm/messaging/netty/MessageEncoder.java | 17 ++
.../backtype/storm/messaging/netty/Server.java | 17 ++
.../messaging/netty/StormClientHandler.java | 17 ++
.../netty/StormClientPipelineFactory.java | 17 ++
.../messaging/netty/StormServerHandler.java | 17 ++
.../netty/StormServerPipelineFactory.java | 17 ++
.../storm/metric/LoggingMetricsConsumer.java | 17 ++
.../storm/metric/MetricsConsumerBolt.java | 17 ++
.../jvm/backtype/storm/metric/SystemBolt.java | 17 ++
.../storm/metric/api/AssignableMetric.java | 17 ++
.../storm/metric/api/CombinedMetric.java | 17 ++
.../backtype/storm/metric/api/CountMetric.java | 17 ++
.../backtype/storm/metric/api/ICombiner.java | 17 ++
.../jvm/backtype/storm/metric/api/IMetric.java | 17 ++
.../storm/metric/api/IMetricsConsumer.java | 17 ++
.../jvm/backtype/storm/metric/api/IReducer.java | 17 ++
.../storm/metric/api/IStatefulObject.java | 17 ++
.../backtype/storm/metric/api/MeanReducer.java | 17 ++
.../storm/metric/api/MultiCountMetric.java | 17 ++
.../storm/metric/api/MultiReducedMetric.java | 17 ++
.../storm/metric/api/ReducedMetric.java | 17 ++
.../backtype/storm/metric/api/StateMetric.java | 17 ++
.../storm/nimbus/DefaultTopologyValidator.java | 17 ++
.../storm/nimbus/ITopologyValidator.java | 17 ++
.../backtype/storm/planner/CompoundSpout.java | 17 ++
.../backtype/storm/planner/CompoundTask.java | 17 ++
.../jvm/backtype/storm/planner/TaskBundle.java | 17 ++
.../jvm/backtype/storm/scheduler/Cluster.java | 17 ++
.../storm/scheduler/ExecutorDetails.java | 91 +++---
.../jvm/backtype/storm/scheduler/INimbus.java | 17 ++
.../backtype/storm/scheduler/IScheduler.java | 17 ++
.../backtype/storm/scheduler/ISupervisor.java | 17 ++
.../storm/scheduler/SchedulerAssignment.java | 97 ++++---
.../scheduler/SchedulerAssignmentImpl.java | 201 +++++++------
.../storm/scheduler/SupervisorDetails.java | 17 ++
.../backtype/storm/scheduler/Topologies.java | 97 ++++---
.../storm/scheduler/TopologyDetails.java | 17 ++
.../backtype/storm/scheduler/WorkerSlot.java | 17 ++
.../backtype/storm/security/auth/AuthUtils.java | 17 ++
.../storm/security/auth/IAuthorizer.java | 17 ++
.../storm/security/auth/ITransportPlugin.java | 17 ++
.../storm/security/auth/ReqContext.java | 17 ++
.../security/auth/SaslTransportPlugin.java | 17 ++
.../security/auth/SimpleTransportPlugin.java | 17 ++
.../storm/security/auth/ThriftClient.java | 17 ++
.../storm/security/auth/ThriftServer.java | 17 ++
.../auth/authorizer/DenyAuthorizer.java | 17 ++
.../auth/authorizer/NoopAuthorizer.java | 17 ++
.../auth/digest/ClientCallbackHandler.java | 17 ++
.../auth/digest/DigestSaslTransportPlugin.java | 17 ++
.../auth/digest/ServerCallbackHandler.java | 17 ++
.../serialization/BlowfishTupleSerializer.java | 17 ++
.../storm/serialization/DefaultKryoFactory.java | 17 ++
.../storm/serialization/IKryoDecorator.java | 17 ++
.../storm/serialization/IKryoFactory.java | 17 ++
.../storm/serialization/ITupleDeserializer.java | 17 ++
.../storm/serialization/ITupleSerializer.java | 17 ++
.../serialization/KryoTupleDeserializer.java | 17 ++
.../serialization/KryoTupleSerializer.java | 17 ++
.../serialization/KryoValuesDeserializer.java | 17 ++
.../serialization/KryoValuesSerializer.java | 17 ++
.../serialization/SerializableSerializer.java | 17 ++
.../serialization/SerializationFactory.java | 17 ++
.../types/ArrayListSerializer.java | 17 ++
.../serialization/types/HashMapSerializer.java | 17 ++
.../serialization/types/HashSetSerializer.java | 17 ++
.../types/ListDelegateSerializer.java | 17 ++
.../storm/spout/IMultiSchemableSpout.java | 17 ++
.../backtype/storm/spout/ISchemableSpout.java | 17 ++
.../src/jvm/backtype/storm/spout/ISpout.java | 17 ++
.../storm/spout/ISpoutOutputCollector.java | 17 ++
.../storm/spout/ISpoutWaitStrategy.java | 17 ++
.../jvm/backtype/storm/spout/MultiScheme.java | 17 ++
.../storm/spout/NothingEmptyEmitStrategy.java | 17 ++
.../backtype/storm/spout/RawMultiScheme.java | 17 ++
.../src/jvm/backtype/storm/spout/RawScheme.java | 17 ++
.../src/jvm/backtype/storm/spout/Scheme.java | 17 ++
.../storm/spout/SchemeAsMultiScheme.java | 17 ++
.../jvm/backtype/storm/spout/ShellSpout.java | 17 ++
.../storm/spout/SleepSpoutWaitStrategy.java | 17 ++
.../storm/spout/SpoutOutputCollector.java | 17 ++
.../jvm/backtype/storm/state/IStateSpout.java | 17 ++
.../storm/state/IStateSpoutOutputCollector.java | 17 ++
.../backtype/storm/state/ISubscribedState.java | 17 ++
.../state/ISynchronizeOutputCollector.java | 17 ++
.../storm/state/StateSpoutOutputCollector.java | 17 ++
.../storm/state/SynchronizeOutputCollector.java | 17 ++
.../storm/task/GeneralTopologyContext.java | 17 ++
.../src/jvm/backtype/storm/task/IBolt.java | 17 ++
.../jvm/backtype/storm/task/IErrorReporter.java | 17 ++
.../backtype/storm/task/IMetricsContext.java | 17 ++
.../backtype/storm/task/IOutputCollector.java | 17 ++
.../backtype/storm/task/OutputCollector.java | 17 ++
.../src/jvm/backtype/storm/task/ShellBolt.java | 17 ++
.../backtype/storm/task/TopologyContext.java | 17 ++
.../storm/task/WorkerTopologyContext.java | 17 ++
.../backtype/storm/testing/AckFailDelegate.java | 17 ++
.../storm/testing/AckFailMapTracker.java | 17 ++
.../jvm/backtype/storm/testing/AckTracker.java | 17 ++
.../backtype/storm/testing/BatchNumberList.java | 17 ++
.../storm/testing/BatchProcessWord.java | 17 ++
.../backtype/storm/testing/BatchRepeatA.java | 17 ++
.../jvm/backtype/storm/testing/BoltTracker.java | 17 ++
.../storm/testing/CompleteTopologyParam.java | 117 ++++----
.../storm/testing/CountingBatchBolt.java | 17 ++
.../storm/testing/CountingCommitBolt.java | 17 ++
.../jvm/backtype/storm/testing/FeederSpout.java | 17 ++
.../jvm/backtype/storm/testing/FixedTuple.java | 17 ++
.../backtype/storm/testing/FixedTupleSpout.java | 17 ++
.../backtype/storm/testing/IdentityBolt.java | 17 ++
.../storm/testing/KeyedCountingBatchBolt.java | 17 ++
.../testing/KeyedCountingCommitterBolt.java | 17 ++
.../storm/testing/KeyedSummingBatchBolt.java | 17 ++
.../storm/testing/MemoryTransactionalSpout.java | 17 ++
.../testing/MemoryTransactionalSpoutMeta.java | 17 ++
.../backtype/storm/testing/MkClusterParam.java | 97 ++++---
.../backtype/storm/testing/MkTupleParam.java | 85 +++---
.../backtype/storm/testing/MockedSources.java | 103 ++++---
.../jvm/backtype/storm/testing/NGrouping.java | 17 ++
.../storm/testing/NonRichBoltTracker.java | 17 ++
.../testing/OpaqueMemoryTransactionalSpout.java | 17 ++
.../storm/testing/PrepareBatchBolt.java | 17 ++
.../backtype/storm/testing/SpoutTracker.java | 17 ++
.../storm/testing/TestAggregatesCounter.java | 17 ++
.../backtype/storm/testing/TestConfBolt.java | 17 ++
.../backtype/storm/testing/TestGlobalCount.java | 17 ++
.../src/jvm/backtype/storm/testing/TestJob.java | 65 +++--
.../storm/testing/TestKryoDecorator.java | 17 ++
.../backtype/storm/testing/TestPlannerBolt.java | 17 ++
.../storm/testing/TestPlannerSpout.java | 17 ++
.../backtype/storm/testing/TestSerObject.java | 17 ++
.../backtype/storm/testing/TestWordCounter.java | 17 ++
.../backtype/storm/testing/TestWordSpout.java | 17 ++
.../backtype/storm/testing/TrackedTopology.java | 51 ++--
.../storm/testing/TupleCaptureBolt.java | 17 ++
.../topology/BaseConfigurationDeclarer.java | 17 ++
.../storm/topology/BasicBoltExecutor.java | 17 ++
.../storm/topology/BasicOutputCollector.java | 17 ++
.../backtype/storm/topology/BoltDeclarer.java | 17 ++
.../ComponentConfigurationDeclarer.java | 17 ++
.../storm/topology/FailedException.java | 17 ++
.../jvm/backtype/storm/topology/IBasicBolt.java | 17 ++
.../storm/topology/IBasicOutputCollector.java | 17 ++
.../jvm/backtype/storm/topology/IComponent.java | 17 ++
.../jvm/backtype/storm/topology/IRichBolt.java | 17 ++
.../jvm/backtype/storm/topology/IRichSpout.java | 17 ++
.../storm/topology/IRichStateSpout.java | 17 ++
.../backtype/storm/topology/InputDeclarer.java | 17 ++
.../storm/topology/OutputFieldsDeclarer.java | 17 ++
.../storm/topology/OutputFieldsGetter.java | 17 ++
.../storm/topology/ReportedFailedException.java | 17 ++
.../backtype/storm/topology/SpoutDeclarer.java | 17 ++
.../storm/topology/TopologyBuilder.java | 17 ++
.../storm/topology/base/BaseBasicBolt.java | 17 ++
.../storm/topology/base/BaseBatchBolt.java | 17 ++
.../storm/topology/base/BaseComponent.java | 17 ++
...BaseOpaquePartitionedTransactionalSpout.java | 17 ++
.../base/BasePartitionedTransactionalSpout.java | 17 ++
.../storm/topology/base/BaseRichBolt.java | 17 ++
.../storm/topology/base/BaseRichSpout.java | 17 ++
.../topology/base/BaseTransactionalBolt.java | 17 ++
.../topology/base/BaseTransactionalSpout.java | 17 ++
.../storm/transactional/ICommitter.java | 17 ++
.../ICommitterTransactionalSpout.java | 17 ++
.../transactional/ITransactionalSpout.java | 17 ++
.../storm/transactional/TransactionAttempt.java | 17 ++
.../TransactionalSpoutBatchExecutor.java | 17 ++
.../TransactionalSpoutCoordinator.java | 17 ++
.../TransactionalTopologyBuilder.java | 17 ++
.../IOpaquePartitionedTransactionalSpout.java | 17 ++
.../IPartitionedTransactionalSpout.java | 17 ++
...uePartitionedTransactionalSpoutExecutor.java | 17 ++
.../PartitionedTransactionalSpoutExecutor.java | 17 ++
.../state/RotatingTransactionalState.java | 17 ++
.../transactional/state/TransactionalState.java | 17 ++
.../src/jvm/backtype/storm/tuple/Fields.java | 17 ++
.../src/jvm/backtype/storm/tuple/MessageId.java | 17 ++
.../src/jvm/backtype/storm/tuple/Tuple.java | 17 ++
.../src/jvm/backtype/storm/tuple/TupleImpl.java | 17 ++
.../src/jvm/backtype/storm/tuple/Values.java | 17 ++
.../storm/utils/BufferFileInputStream.java | 17 ++
.../backtype/storm/utils/CRC32OutputStream.java | 17 ++
.../backtype/storm/utils/ClojureTimerTask.java | 17 ++
.../src/jvm/backtype/storm/utils/Container.java | 17 ++
.../jvm/backtype/storm/utils/DRPCClient.java | 17 ++
.../backtype/storm/utils/DisruptorQueue.java | 17 ++
.../storm/utils/IndifferentAccessMap.java | 17 ++
.../backtype/storm/utils/InprocMessaging.java | 17 ++
.../storm/utils/KeyedRoundRobinQueue.java | 17 ++
.../jvm/backtype/storm/utils/ListDelegate.java | 17 ++
.../jvm/backtype/storm/utils/LocalState.java | 17 ++
.../jvm/backtype/storm/utils/MutableInt.java | 17 ++
.../jvm/backtype/storm/utils/MutableLong.java | 17 ++
.../jvm/backtype/storm/utils/MutableObject.java | 17 ++
.../jvm/backtype/storm/utils/NimbusClient.java | 17 ++
.../storm/utils/RegisteredGlobalState.java | 17 ++
.../jvm/backtype/storm/utils/RotatingMap.java | 17 ++
.../backtype/storm/utils/ServiceRegistry.java | 17 ++
.../jvm/backtype/storm/utils/ShellProcess.java | 17 ++
.../storm/utils/ThriftTopologyUtils.java | 17 ++
.../src/jvm/backtype/storm/utils/Time.java | 17 ++
.../jvm/backtype/storm/utils/TimeCacheMap.java | 17 ++
.../src/jvm/backtype/storm/utils/Utils.java | 17 ++
.../backtype/storm/utils/VersionedStore.java | 17 ++
.../storm/utils/WindowedTimeThrottler.java | 17 ++
.../jvm/backtype/storm/utils/WritableUtils.java | 17 ++
.../backtype/storm/utils/ZookeeperAuthInfo.java | 17 ++
storm-core/src/jvm/storm/trident/JoinType.java | 17 ++
storm-core/src/jvm/storm/trident/Stream.java | 17 ++
.../src/jvm/storm/trident/TridentState.java | 17 ++
.../src/jvm/storm/trident/TridentTopology.java | 17 ++
.../trident/drpc/ReturnResultsReducer.java | 17 ++
.../fluent/ChainedAggregatorDeclarer.java | 17 ++
.../fluent/ChainedFullAggregatorDeclarer.java | 17 ++
.../ChainedPartitionAggregatorDeclarer.java | 17 ++
.../trident/fluent/GlobalAggregationScheme.java | 17 ++
.../jvm/storm/trident/fluent/GroupedStream.java | 17 ++
.../trident/fluent/IAggregatableStream.java | 17 ++
.../fluent/IChainedAggregatorDeclarer.java | 17 ++
.../jvm/storm/trident/fluent/UniqueIdGen.java | 17 ++
.../jvm/storm/trident/graph/GraphGrouper.java | 17 ++
.../src/jvm/storm/trident/graph/Group.java | 17 ++
.../jvm/storm/trident/operation/Aggregator.java | 17 ++
.../jvm/storm/trident/operation/Assembly.java | 17 ++
.../storm/trident/operation/BaseAggregator.java | 17 ++
.../jvm/storm/trident/operation/BaseFilter.java | 17 ++
.../storm/trident/operation/BaseFunction.java | 17 ++
.../trident/operation/BaseMultiReducer.java | 17 ++
.../storm/trident/operation/BaseOperation.java | 17 ++
.../trident/operation/CombinerAggregator.java | 17 ++
.../storm/trident/operation/EachOperation.java | 17 ++
.../src/jvm/storm/trident/operation/Filter.java | 17 ++
.../jvm/storm/trident/operation/Function.java | 17 ++
.../trident/operation/GroupedMultiReducer.java | 17 ++
.../storm/trident/operation/MultiReducer.java | 17 ++
.../jvm/storm/trident/operation/Operation.java | 17 ++
.../trident/operation/ReducerAggregator.java | 17 ++
.../trident/operation/TridentCollector.java | 17 ++
.../operation/TridentMultiReducerContext.java | 17 ++
.../operation/TridentOperationContext.java | 17 ++
.../storm/trident/operation/builtin/Count.java | 17 ++
.../storm/trident/operation/builtin/Debug.java | 17 ++
.../storm/trident/operation/builtin/Equals.java | 17 ++
.../trident/operation/builtin/FilterNull.java | 17 ++
.../storm/trident/operation/builtin/FirstN.java | 17 ++
.../storm/trident/operation/builtin/MapGet.java | 17 ++
.../storm/trident/operation/builtin/Negate.java | 17 ++
.../trident/operation/builtin/SnapshotGet.java | 17 ++
.../storm/trident/operation/builtin/Sum.java | 17 ++
.../operation/builtin/TupleCollectionGet.java | 17 ++
.../operation/impl/CaptureCollector.java | 17 ++
.../operation/impl/ChainedAggregatorImpl.java | 17 ++
.../trident/operation/impl/ChainedResult.java | 17 ++
.../operation/impl/CombinerAggStateUpdater.java | 17 ++
.../impl/CombinerAggregatorCombineImpl.java | 17 ++
.../impl/CombinerAggregatorInitImpl.java | 17 ++
.../trident/operation/impl/FilterExecutor.java | 17 ++
.../operation/impl/GlobalBatchToPartition.java | 17 ++
.../trident/operation/impl/GroupCollector.java | 17 ++
.../operation/impl/GroupedAggregator.java | 17 ++
.../impl/GroupedMultiReducerExecutor.java | 17 ++
.../operation/impl/IdentityMultiReducer.java | 17 ++
.../impl/IndexHashBatchToPartition.java | 17 ++
.../operation/impl/JoinerMultiReducer.java | 17 ++
.../operation/impl/ReducerAggStateUpdater.java | 17 ++
.../operation/impl/ReducerAggregatorImpl.java | 17 ++
.../storm/trident/operation/impl/Result.java | 17 ++
.../operation/impl/SingleEmitAggregator.java | 17 ++
.../trident/operation/impl/TrueFilter.java | 17 ++
.../storm/trident/partition/GlobalGrouping.java | 17 ++
.../trident/partition/IdentityGrouping.java | 17 ++
.../trident/partition/IndexHashGrouping.java | 17 ++
.../storm/trident/planner/BridgeReceiver.java | 17 ++
.../src/jvm/storm/trident/planner/Node.java | 17 ++
.../storm/trident/planner/NodeStateInfo.java | 17 ++
.../storm/trident/planner/PartitionNode.java | 17 ++
.../storm/trident/planner/ProcessorContext.java | 17 ++
.../storm/trident/planner/ProcessorNode.java | 17 ++
.../jvm/storm/trident/planner/SpoutNode.java | 17 ++
.../storm/trident/planner/SubtopologyBolt.java | 17 ++
.../storm/trident/planner/TridentProcessor.java | 17 ++
.../storm/trident/planner/TupleReceiver.java | 17 ++
.../planner/processor/AggregateProcessor.java | 17 ++
.../planner/processor/AppendCollector.java | 17 ++
.../planner/processor/EachProcessor.java | 17 ++
.../planner/processor/FreshCollector.java | 17 ++
.../processor/MultiReducerProcessor.java | 17 ++
.../processor/PartitionPersistProcessor.java | 17 ++
.../planner/processor/ProjectedProcessor.java | 17 ++
.../planner/processor/StateQueryProcessor.java | 17 ++
.../planner/processor/TridentContext.java | 17 ++
.../storm/trident/spout/BatchSpoutExecutor.java | 17 ++
.../src/jvm/storm/trident/spout/IBatchID.java | 17 ++
.../jvm/storm/trident/spout/IBatchSpout.java | 17 ++
.../trident/spout/ICommitterTridentSpout.java | 17 ++
.../spout/IOpaquePartitionedTridentSpout.java | 17 ++
.../trident/spout/IPartitionedTridentSpout.java | 17 ++
.../storm/trident/spout/ISpoutPartition.java | 17 ++
.../jvm/storm/trident/spout/ITridentSpout.java | 17 ++
.../OpaquePartitionedTridentSpoutExecutor.java | 17 ++
.../spout/PartitionedTridentSpoutExecutor.java | 17 ++
.../trident/spout/RichSpoutBatchExecutor.java | 17 ++
.../storm/trident/spout/RichSpoutBatchId.java | 17 ++
.../spout/RichSpoutBatchIdSerializer.java | 17 ++
.../trident/spout/RichSpoutBatchTriggerer.java | 17 ++
.../trident/spout/TridentSpoutCoordinator.java | 17 ++
.../trident/spout/TridentSpoutExecutor.java | 17 ++
.../storm/trident/state/BaseQueryFunction.java | 17 ++
.../storm/trident/state/BaseStateUpdater.java | 17 ++
.../trident/state/CombinerValueUpdater.java | 17 ++
.../storm/trident/state/ITupleCollection.java | 17 ++
.../state/JSONNonTransactionalSerializer.java | 17 ++
.../trident/state/JSONOpaqueSerializer.java | 17 ++
.../state/JSONTransactionalSerializer.java | 17 ++
.../jvm/storm/trident/state/OpaqueValue.java | 17 ++
.../jvm/storm/trident/state/QueryFunction.java | 17 ++
.../jvm/storm/trident/state/ReadOnlyState.java | 17 ++
.../trident/state/ReducerValueUpdater.java | 17 ++
.../src/jvm/storm/trident/state/Serializer.java | 17 ++
.../src/jvm/storm/trident/state/State.java | 17 ++
.../jvm/storm/trident/state/StateFactory.java | 17 ++
.../src/jvm/storm/trident/state/StateSpec.java | 17 ++
.../src/jvm/storm/trident/state/StateType.java | 17 ++
.../jvm/storm/trident/state/StateUpdater.java | 17 ++
.../storm/trident/state/TransactionalValue.java | 17 ++
.../jvm/storm/trident/state/ValueUpdater.java | 17 ++
.../trident/state/map/CachedBatchReadsMap.java | 17 ++
.../jvm/storm/trident/state/map/CachedMap.java | 17 ++
.../storm/trident/state/map/IBackingMap.java | 17 ++
.../state/map/MapCombinerAggStateUpdater.java | 17 ++
.../state/map/MapReducerAggStateUpdater.java | 17 ++
.../jvm/storm/trident/state/map/MapState.java | 17 ++
.../state/map/MicroBatchIBackingMap.java | 17 ++
.../trident/state/map/NonTransactionalMap.java | 17 ++
.../jvm/storm/trident/state/map/OpaqueMap.java | 17 ++
.../trident/state/map/ReadOnlyMapState.java | 17 ++
.../trident/state/map/SnapshottableMap.java | 17 ++
.../trident/state/map/TransactionalMap.java | 17 ++
.../state/snapshot/ReadOnlySnapshottable.java | 17 ++
.../trident/state/snapshot/Snapshottable.java | 17 ++
.../trident/testing/CountAsAggregator.java | 17 ++
.../storm/trident/testing/FeederBatchSpout.java | 17 ++
.../testing/FeederCommitterBatchSpout.java | 17 ++
.../storm/trident/testing/FixedBatchSpout.java | 17 ++
.../src/jvm/storm/trident/testing/IFeeder.java | 17 ++
.../trident/testing/LRUMemoryMapState.java | 17 ++
.../storm/trident/testing/MemoryBackingMap.java | 17 ++
.../storm/trident/testing/MemoryMapState.java | 17 ++
.../storm/trident/testing/MockTridentTuple.java | 17 ++
.../src/jvm/storm/trident/testing/Split.java | 17 ++
.../jvm/storm/trident/testing/StringLength.java | 17 ++
.../jvm/storm/trident/testing/TrueFilter.java | 17 ++
.../jvm/storm/trident/testing/TuplifyArgs.java | 17 ++
.../jvm/storm/trident/topology/BatchInfo.java | 17 ++
.../trident/topology/ITridentBatchBolt.java | 17 ++
.../topology/MasterBatchCoordinator.java | 17 ++
.../trident/topology/TransactionAttempt.java | 17 ++
.../trident/topology/TridentBoltExecutor.java | 17 ++
.../topology/TridentTopologyBuilder.java | 17 ++
.../state/RotatingTransactionalState.java | 17 ++
.../topology/state/TransactionalState.java | 17 ++
.../src/jvm/storm/trident/tuple/ComboList.java | 17 ++
.../src/jvm/storm/trident/tuple/ConsList.java | 17 ++
.../jvm/storm/trident/tuple/TridentTuple.java | 17 ++
.../storm/trident/tuple/TridentTupleView.java | 17 ++
.../jvm/storm/trident/tuple/ValuePointer.java | 17 ++
.../storm/trident/util/ErrorEdgeFactory.java | 17 ++
.../src/jvm/storm/trident/util/IndexedEdge.java | 17 ++
.../src/jvm/storm/trident/util/LRUMap.java | 17 ++
.../jvm/storm/trident/util/TridentUtils.java | 17 ++
storm-core/src/multilang/py/storm.py | 18 ++
storm-core/src/multilang/rb/storm.rb | 18 ++
storm-core/src/storm.thrift | 23 ++
storm-core/src/ui/public/css/style.css | 17 ++
storm-core/src/ui/public/js/script.js | 17 ++
.../test/clj/backtype/storm/clojure_test.clj | 15 +
.../test/clj/backtype/storm/cluster_test.clj | 15 +
.../test/clj/backtype/storm/config_test.clj | 15 +
.../test/clj/backtype/storm/drpc_test.clj | 15 +
.../test/clj/backtype/storm/fields_test.clj | 15 +
.../test/clj/backtype/storm/grouping_test.clj | 15 +
.../clj/backtype/storm/integration_test.clj | 15 +
.../clj/backtype/storm/local_state_test.clj | 15 +
.../storm/messaging/netty_integration_test.clj | 15 +
.../storm/messaging/netty_unit_test.clj | 15 +
.../test/clj/backtype/storm/messaging_test.clj | 15 +
.../test/clj/backtype/storm/metrics_test.clj | 15 +
.../test/clj/backtype/storm/multilang_test.clj | 15 +
.../test/clj/backtype/storm/nimbus_test.clj | 15 +
.../test/clj/backtype/storm/scheduler_test.clj | 15 +
.../storm/security/auth/AuthUtils_test.clj | 15 +
.../storm/security/auth/ReqContext_test.clj | 15 +
.../security/auth/SaslTransportPlugin_test.clj | 15 +
.../storm/security/auth/ThriftClient_test.clj | 15 +
.../storm/security/auth/ThriftServer_test.clj | 15 +
.../backtype/storm/security/auth/auth_test.clj | 15 +
.../BlowfishTupleSerializer_test.clj | 15 +
.../serialization/SerializationFactory_test.clj | 15 +
.../clj/backtype/storm/serialization_test.clj | 15 +
.../clj/backtype/storm/subtopology_test.clj | 15 +
.../test/clj/backtype/storm/supervisor_test.clj | 15 +
.../test/clj/backtype/storm/testing4j_test.clj | 15 +
.../test/clj/backtype/storm/tick_tuple_test.clj | 15 +
.../clj/backtype/storm/transactional_test.clj | 15 +
.../test/clj/backtype/storm/tuple_test.clj | 15 +
.../test/clj/backtype/storm/utils_test.clj | 15 +
.../clj/backtype/storm/versioned_store_test.clj | 15 +
.../test/clj/storm/trident/integration_test.clj | 15 +
.../test/clj/storm/trident/state_test.clj | 15 +
.../test/clj/storm/trident/tuple_test.clj | 15 +
storm-lib/project.clj | 15 +
558 files changed, 9974 insertions(+), 695 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/conf/defaults.yaml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/project.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index c2b391a,0000000..d765e71
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@@ -1,204 -1,0 +1,221 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.Config;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.utils.Utils;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+class Client implements IConnection {
+ private static final Logger LOG = LoggerFactory.getLogger(Client.class);
+ private final int max_retries;
+ private final int base_sleep_ms;
+ private final int max_sleep_ms;
+ private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
+ private AtomicReference<Channel> channelRef;
+ private final ClientBootstrap bootstrap;
+ private InetSocketAddress remote_addr;
+ private AtomicInteger retries;
+ private final Random random = new Random();
+ private final ChannelFactory factory;
+ private final int buffer_size;
+ private final AtomicBoolean being_closed;
+
+ @SuppressWarnings("rawtypes")
+ Client(Map storm_conf, String host, int port) {
+ message_queue = new LinkedBlockingQueue<Object>();
+ retries = new AtomicInteger(0);
+ channelRef = new AtomicReference<Channel>(null);
+ being_closed = new AtomicBoolean(false);
+
+ // Configure
+ buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+ max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
+ base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
+ max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
+ int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
+
+ if (maxWorkers > 0) {
+ factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
+ } else {
+ factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+ }
+ bootstrap = new ClientBootstrap(factory);
+ bootstrap.setOption("tcpNoDelay", true);
+ bootstrap.setOption("sendBufferSize", buffer_size);
+ bootstrap.setOption("keepAlive", true);
+
+ // Set up the pipeline factory.
+ bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
+
+ // Start the connection attempt.
+ remote_addr = new InetSocketAddress(host, port);
+ bootstrap.connect(remote_addr);
+ }
+
+ /**
+ * We will retry connection with exponential back-off policy
+ */
+ void reconnect() {
+ try {
+ int tried_count = retries.incrementAndGet();
+ if (tried_count <= max_retries) {
+ Thread.sleep(getSleepTimeMs());
+ LOG.info("Reconnect ... [{}]", tried_count);
+ bootstrap.connect(remote_addr);
+ LOG.debug("connection started...");
+ } else {
+ LOG.warn("Remote address is not reachable. We will close this client.");
+ close();
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("connection failed", e);
+ }
+ }
+
+ /**
+ * # of milliseconds to wait per exponential back-off policy
+ */
+ private int getSleepTimeMs()
+ {
+ int backoff = 1 << retries.get();
+ int sleepMs = base_sleep_ms * Math.max(1, random.nextInt(backoff));
+ if ( sleepMs > max_sleep_ms )
+ sleepMs = max_sleep_ms;
+ return sleepMs;
+ }
+
+ /**
+ * Enqueue a task message to be sent to server
+ */
+ public void send(int task, byte[] message) {
+ //throw exception if the client is being closed
+ if (being_closed.get()) {
+ throw new RuntimeException("Client is being closed, and does not take requests any more");
+ }
+
+ try {
+ message_queue.put(new TaskMessage(task, message));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Take all enqueued messages from queue
+ * @return
+ * @throws InterruptedException
+ */
+ MessageBatch takeMessages() throws InterruptedException {
+ //1st message
+ MessageBatch batch = new MessageBatch(buffer_size);
+ Object msg = message_queue.take();
+ batch.add(msg);
+
+ //we will discard any message after CLOSE
+ if (msg==ControlMessage.CLOSE_MESSAGE)
+ return batch;
+
+ while (!batch.isFull()) {
+ //peek the next message
+ msg = message_queue.peek();
+ //no more messages
+ if (msg == null) break;
+
+ //we will discard any message after CLOSE
+ if (msg==ControlMessage.CLOSE_MESSAGE) {
+ message_queue.take();
+ batch.add(msg);
+ break;
+ }
+
+ //try to add this msg into batch
+ if (!batch.tryAdd((TaskMessage) msg))
+ break;
+
+ //remove this message
+ message_queue.take();
+ }
+
+ return batch;
+ }
+
+ /**
+ * gracefully close this client.
+ *
+ * We will send all existing requests, and then invoke close_n_release() method
+ */
+ public synchronized void close() {
+ if (!being_closed.get()) {
+ //enqueue a CLOSE message so that shutdown() will be invoked
+ try {
+ message_queue.put(ControlMessage.CLOSE_MESSAGE);
+ being_closed.set(true);
+ } catch (InterruptedException e) {
+ close_n_release();
+ }
+ }
+ }
+
+ /**
+ * close_n_release() is invoked after all messages have been sent.
+ */
+ void close_n_release() {
+ if (channelRef.get() != null)
+ channelRef.get().close().awaitUninterruptibly();
+
+ //we need to release resources
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ factory.releaseExternalResources();
+ }}).start();
+ }
+
+ public TaskMessage recv(int flags) {
+ throw new RuntimeException("Client connection should not receive any messages");
+ }
+
+ void setChannel(Channel channel) {
+ channelRef.set(channel);
+ //reset retries
+ if (channel != null)
+ retries.set(0);
+ }
+
+}
+
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 018e0f9,0000000..3e09dd1
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@@ -1,50 -1,0 +1,67 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.IContext;
+
+import java.util.Map;
+import java.util.Vector;
+
+public class Context implements IContext {
+ @SuppressWarnings("rawtypes")
+ private Map storm_conf;
+ private volatile Vector<IConnection> connections;
+
+ /**
+ * initialization per Storm configuration
+ */
+ @SuppressWarnings("rawtypes")
+ public void prepare(Map storm_conf) {
+ this.storm_conf = storm_conf;
+ connections = new Vector<IConnection>();
+ }
+
+ /**
+ * establish a server with a binding port
+ */
+ public IConnection bind(String storm_id, int port) {
+ IConnection server = new Server(storm_conf, port);
+ connections.add(server);
+ return server;
+ }
+
+ /**
+ * establish a connection to a remote server
+ */
+ public IConnection connect(String storm_id, String host, int port) {
+ IConnection client = new Client(storm_conf, host, port);
+ connections.add(client);
+ return client;
+ }
+
+ /**
+ * terminate this context
+ */
+ public void term() {
+ for (IConnection conn : connections) {
+ conn.close();
+ }
+ connections = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
index 4cc2040,0000000..a552cf7
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
@@@ -1,50 -1,0 +1,67 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+enum ControlMessage {
+ CLOSE_MESSAGE((short)-100),
+ EOB_MESSAGE((short)-201),
+ OK_RESPONSE((short)-200),
+ FAILURE_RESPONSE((short)-400);
+
+ private short code;
+
+ //private constructor
+ private ControlMessage(short code) {
+ this.code = code;
+ }
+
+ /**
+ * Return a control message per an encoded status code
+ * @param encoded
+ * @return
+ */
+ static ControlMessage mkMessage(short encoded) {
+ for(ControlMessage cm: ControlMessage.values()) {
+ if(encoded == cm.code) return cm;
+ }
+ return null;
+ }
+
+ int encodeLength() {
+ return 2; //short
+ }
+
+ /**
+ * encode the current Control Message into a channel buffer
+ * @throws Exception
+ */
+ ChannelBuffer buffer() throws Exception {
+ ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));
+ write(bout);
+ bout.close();
+ return bout.buffer();
+ }
+
+ void write(ChannelBufferOutputStream bout) throws Exception {
+ bout.writeShort(code);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
index a9d46a2,0000000..9d287e4
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@@ -1,151 -1,0 +1,168 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.util.ArrayList;
+
+class MessageBatch {
+ private int buffer_size;
+ private ArrayList<Object> msgs;
+ private int encoded_length;
+
+ MessageBatch(int buffer_size) {
+ this.buffer_size = buffer_size;
+ msgs = new ArrayList<Object>();
+ encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
+ }
+
+ void add(Object obj) {
+ if (obj == null)
+ throw new RuntimeException("null object forbidded in message batch");
+
+ if (obj instanceof TaskMessage) {
+ TaskMessage msg = (TaskMessage)obj;
+ msgs.add(msg);
+ encoded_length += msgEncodeLength(msg);
+ return;
+ }
+
+ if (obj instanceof ControlMessage) {
+ ControlMessage msg = (ControlMessage)obj;
+ msgs.add(msg);
+ encoded_length += msg.encodeLength();
+ return;
+ }
+
+ throw new RuntimeException("Unsuppoted object type "+obj.getClass().getName());
+ }
+
+ void remove(Object obj) {
+ if (obj == null) return;
+
+ if (obj instanceof TaskMessage) {
+ TaskMessage msg = (TaskMessage)obj;
+ msgs.remove(msg);
+ encoded_length -= msgEncodeLength(msg);
+ return;
+ }
+
+ if (obj instanceof ControlMessage) {
+ ControlMessage msg = (ControlMessage)obj;
+ msgs.remove(msg);
+ encoded_length -= msg.encodeLength();
+ return;
+ }
+ }
+
+ Object get(int index) {
+ return msgs.get(index);
+ }
+
+ /**
+ * try to add a TaskMessage to a batch
+ * @param taskMsg
+ * @return false if the msg could not be added due to buffer size limit; true otherwise
+ */
+ boolean tryAdd(TaskMessage taskMsg) {
+ if ((encoded_length + msgEncodeLength(taskMsg)) > buffer_size)
+ return false;
+ add(taskMsg);
+ return true;
+ }
+
+ private int msgEncodeLength(TaskMessage taskMsg) {
+ if (taskMsg == null) return 0;
+
+ int size = 6; //INT + SHORT
+ if (taskMsg.message() != null)
+ size += taskMsg.message().length;
+ return size;
+ }
+
+ /**
+ * Has this batch used up allowed buffer size
+ * @return
+ */
+ boolean isFull() {
+ return encoded_length >= buffer_size;
+ }
+
+ /**
+ * true if this batch doesn't have any messages
+ * @return
+ */
+ boolean isEmpty() {
+ return msgs.isEmpty();
+ }
+
+ /**
+ * # of msgs in this batch
+ * @return
+ */
+ int size() {
+ return msgs.size();
+ }
+
+ /**
+ * create a buffer containing the encoding of this batch
+ */
+ ChannelBuffer buffer() throws Exception {
+ ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
+
+ for (Object msg : msgs)
+ if (msg instanceof TaskMessage)
+ writeTaskMessage(bout, (TaskMessage)msg);
+ else
+ ((ControlMessage)msg).write(bout);
+
+ //add a END_OF_BATCH indicator
+ ControlMessage.EOB_MESSAGE.write(bout);
+
+ bout.close();
+
+ return bout.buffer();
+ }
+
+ /**
+ * write a TaskMessage into a stream
+ *
+ * Each TaskMessage is encoded as:
+ * task ... short(2)
+ * len ... int(4)
+ * payload ... byte[] *
+ */
+ private void writeTaskMessage(ChannelBufferOutputStream bout, TaskMessage message) throws Exception {
+ int payload_len = 0;
+ if (message.message() != null)
+ payload_len = message.message().length;
+
+ int task_id = message.task();
+ if (task_id > Short.MAX_VALUE)
+ throw new RuntimeException("Task ID should not exceed "+Short.MAX_VALUE);
+
+ bout.writeShort((short)task_id);
+ bout.writeInt(payload_len);
+ if (payload_len >0)
+ bout.write(message.message());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
index 76776a9,0000000..3365e58
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
@@@ -1,68 -1,0 +1,85 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+public class MessageDecoder extends FrameDecoder {
+ /*
+ * Each ControlMessage is encoded as:
+ * code (<0) ... short(2)
+ * Each TaskMessage is encoded as:
+ * task (>=0) ... short(2)
+ * len ... int(4)
+ * payload ... byte[] *
+ */
+ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
+ // Make sure that we have received at least a short
+ if (buf.readableBytes() < 2) {
+ //need more data
+ return null;
+ }
+
+ // Mark the current buffer position before reading task/len field
+ // because the whole frame might not be in the buffer yet.
+ // We will reset the buffer position to the marked position if
+ // there's not enough bytes in the buffer.
+ buf.markReaderIndex();
+
+ //read the short field
+ short code = buf.readShort();
+
+ //case 1: Control message
+ ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
+ if (ctrl_msg != null) return ctrl_msg;
+
+ //case 2: task Message
+ short task = code;
+
+ // Make sure that we have received at least an integer (length)
+ if (buf.readableBytes() < 4) {
+ //need more data
+ buf.resetReaderIndex();
+ return null;
+ }
+
+ // Read the length field.
+ int length = buf.readInt();
+ if (length<=0) {
+ return new TaskMessage(task, null);
+ }
+
+ // Make sure if there's enough bytes in the buffer.
+ if (buf.readableBytes() < length) {
+ // The whole bytes were not received yet - return null.
+ buf.resetReaderIndex();
+ return null;
+ }
+
+ // There's enough bytes in the buffer. Read it.
+ ChannelBuffer payload = buf.readBytes(length);
+
+ // Successfully decoded a frame.
+ // Return a TaskMessage object
+ return new TaskMessage(task,payload.array());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
index c0ac8f1,0000000..e6e65c3
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
@@@ -1,22 -1,0 +1,39 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+public class MessageEncoder extends OneToOneEncoder {
+ @Override
+ protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
+ if (obj instanceof ControlMessage) {
+ return ((ControlMessage)obj).buffer();
+ }
+
+ if (obj instanceof MessageBatch) {
+ return ((MessageBatch)obj).buffer();
+ }
+
+ throw new RuntimeException("Unsupported encoding of object of class "+obj.getClass().getName());
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index bf6825c,0000000..ad811b0
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@@ -1,119 -1,0 +1,136 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.Config;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.utils.Utils;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+class Server implements IConnection {
+ private static final Logger LOG = LoggerFactory.getLogger(Server.class);
+ @SuppressWarnings("rawtypes")
+ Map storm_conf;
+ int port;
+ private LinkedBlockingQueue<TaskMessage> message_queue;
+ volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
+ final ChannelFactory factory;
+ final ServerBootstrap bootstrap;
+
+ @SuppressWarnings("rawtypes")
+ Server(Map storm_conf, int port) {
+ this.storm_conf = storm_conf;
+ this.port = port;
+ message_queue = new LinkedBlockingQueue<TaskMessage>();
+
+ // Configure the server.
+ int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+ int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS));
+
+ if (maxWorkers > 0) {
+ factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
+ } else {
+ factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+ }
+ bootstrap = new ServerBootstrap(factory);
+ bootstrap.setOption("child.tcpNoDelay", true);
+ bootstrap.setOption("child.receiveBufferSize", buffer_size);
+ bootstrap.setOption("child.keepAlive", true);
+
+ // Set up the pipeline factory.
+ bootstrap.setPipelineFactory(new StormServerPipelineFactory(this));
+
+ // Bind and start to accept incoming connections.
+ Channel channel = bootstrap.bind(new InetSocketAddress(port));
+ allChannels.add(channel);
+ }
+
+ /**
+ * enqueue a received message
+ * @param message
+ * @throws InterruptedException
+ */
+ protected void enqueue(TaskMessage message) throws InterruptedException {
+ message_queue.put(message);
+ LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length);
+ }
+
+ /**
+ * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
+ */
+ public TaskMessage recv(int flags) {
+ if ((flags & 0x01) == 0x01) {
+ //non-blocking
+ return message_queue.poll();
+ } else {
+ try {
+ TaskMessage request = message_queue.take();
+ LOG.debug("request to be processed: {}", request);
+ return request;
+ } catch (InterruptedException e) {
+ LOG.info("exception within msg receiving", e);
+ return null;
+ }
+ }
+ }
+
+ /**
+ * register a newly created channel
+ * @param channel
+ */
+ protected void addChannel(Channel channel) {
+ allChannels.add(channel);
+ }
+
+ /**
+ * close a channel
+ * @param channel
+ */
+ protected void closeChannel(Channel channel) {
+ channel.close().awaitUninterruptibly();
+ allChannels.remove(channel);
+ }
+
+ /**
+ * close all channels, and release resources
+ */
+ public synchronized void close() {
+ if (allChannels != null) {
+ allChannels.close().awaitUninterruptibly();
+ factory.releaseExternalResources();
+ allChannels = null;
+ }
+ }
+
+ public void send(int task, byte[] message) {
+ throw new RuntimeException("Server connection should not send any messages");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
index 6fbfb1c,0000000..65c36a7
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
@@@ -1,104 -1,0 +1,121 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class StormClientHandler extends SimpleChannelUpstreamHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
+ private Client client;
+ private AtomicBoolean being_closed;
+ long start_time;
+
+ StormClientHandler(Client client) {
+ this.client = client;
+ being_closed = new AtomicBoolean(false);
+ start_time = System.currentTimeMillis();
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) {
+ //register the newly established channel
+ Channel channel = event.getChannel();
+ client.setChannel(channel);
+ LOG.debug("connection established to a remote host");
+
+ //send next request
+ try {
+ sendRequests(channel, client.takeMessages());
+ } catch (InterruptedException e) {
+ channel.close();
+ }
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
+ LOG.debug("send/recv time (ms): {}", (System.currentTimeMillis() - start_time));
+
+ //examine the response message from server
+ ControlMessage msg = (ControlMessage)event.getMessage();
+ if (msg==ControlMessage.FAILURE_RESPONSE)
+ LOG.info("failure response:{}", msg);
+
+ //send next request
+ Channel channel = event.getChannel();
+ try {
+ sendRequests(channel, client.takeMessages());
+ } catch (InterruptedException e) {
+ channel.close();
+ }
+ }
+
+ /**
+ * Retrieve a request from message queue, and send to server
+ * @param channel
+ */
+ private void sendRequests(Channel channel, final MessageBatch requests) {
+ if (requests==null || requests.size()==0 || being_closed.get()) return;
+
+ //if task==CLOSE_MESSAGE for our last request, the channel is to be closed
+ Object last_msg = requests.get(requests.size()-1);
+ if (last_msg==ControlMessage.CLOSE_MESSAGE) {
+ being_closed.set(true);
+ requests.remove(last_msg);
+ }
+
+ //we may don't need do anything if no requests found
+ if (requests.isEmpty()) {
+ if (being_closed.get())
+ client.close_n_release();
+ return;
+ }
+
+ //write request into socket channel
+ ChannelFuture future = channel.write(requests);
+ future.addListener(new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture future)
+ throws Exception {
+ if (!future.isSuccess()) {
+ LOG.info("failed to send requests:", future.getCause());
+ future.getChannel().close();
+ } else {
+ LOG.debug("{} request(s) sent", requests.size());
+ }
+ if (being_closed.get())
+ client.close_n_release();
+ }
+ });
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
+ Throwable cause = event.getCause();
+ if (!(cause instanceof ConnectException)) {
+ LOG.info("Connection failed:", cause);
+ }
+ if (!being_closed.get()) {
+ client.setChannel(null);
+ client.reconnect();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
index 91c513a,0000000..6bad8e3
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@@ -1,27 -1,0 +1,44 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+
+class StormClientPipelineFactory implements ChannelPipelineFactory {
+ private Client client;
+
+ StormClientPipelineFactory(Client client) {
+ this.client = client;
+ }
+
+ public ChannelPipeline getPipeline() throws Exception {
+ // Create a default pipeline implementation.
+ ChannelPipeline pipeline = Channels.pipeline();
+
+ // Decoder
+ pipeline.addLast("decoder", new MessageDecoder());
+ // Encoder
+ pipeline.addLast("encoder", new MessageEncoder());
+ // business logic.
+ pipeline.addLast("handler", new StormClientHandler(client));
+
+ return pipeline;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
index 9a5aaed,0000000..093fb61
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
@@@ -1,53 -1,0 +1,70 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+import org.jboss.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+class StormServerHandler extends SimpleChannelUpstreamHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
+ Server server;
+ private AtomicInteger failure_count;
+
+ StormServerHandler(Server server) {
+ this.server = server;
+ failure_count = new AtomicInteger(0);
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ server.addChannel(e.getChannel());
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ Object msg = e.getMessage();
+ if (msg == null) return;
+
+ //end of batch?
+ if (msg==ControlMessage.EOB_MESSAGE) {
+ Channel channel = ctx.getChannel();
+ LOG.debug("Send back response ...");
+ if (failure_count.get()==0)
+ channel.write(ControlMessage.OK_RESPONSE);
+ else channel.write(ControlMessage.FAILURE_RESPONSE);
+ return;
+ }
+
+ //enqueue the received message for processing
+ try {
+ server.enqueue((TaskMessage)msg);
+ } catch (InterruptedException e1) {
+ LOG.info("failed to enqueue a request message", e);
+ failure_count.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ server.closeChannel(e.getChannel());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
index 56b0834,0000000..df29ba8
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
@@@ -1,28 -1,0 +1,45 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+
+
+class StormServerPipelineFactory implements ChannelPipelineFactory {
+ private Server server;
+
+ StormServerPipelineFactory(Server server) {
+ this.server = server;
+ }
+
+ public ChannelPipeline getPipeline() throws Exception {
+ // Create a default pipeline implementation.
+ ChannelPipeline pipeline = Channels.pipeline();
+
+ // Decoder
+ pipeline.addLast("decoder", new MessageDecoder());
+ // Encoder
+ pipeline.addLast("encoder", new MessageEncoder());
+ // business logic.
+ pipeline.addLast("handler", new StormServerHandler(server));
+
+ return pipeline;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
index eefcb48,0000000..0c908c5
mode 100644,000000..100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
@@@ -1,44 -1,0 +1,59 @@@
++;; Licensed to the Apache Software Foundation (ASF) under one
++;; or more contributor license agreements. See the NOTICE file
++;; distributed with this work for additional information
++;; regarding copyright ownership. The ASF licenses this file
++;; to you under the Apache License, Version 2.0 (the
++;; "License"); you may not use this file except in compliance
++;; with the License. You may obtain a copy of the License at
++;;
++;; http:;; www.apache.org/licenses/LICENSE-2.0
++;;
++;; Unless required by applicable law or agreed to in writing, software
++;; distributed under the License is distributed on an "AS IS" BASIS,
++;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++;; See the License for the specific language governing permissions and
++;; limitations under the License.
+(ns backtype.storm.messaging.netty-integration-test
+ (:use [clojure test])
+ (:import [backtype.storm.messaging TransportFactory])
+ (:import [backtype.storm.testing TestWordSpout TestGlobalCount])
+ (:use [backtype.storm bootstrap testing util]))
+
+(bootstrap)
+
+(deftest test-integration
+ (with-simulated-time-local-cluster [cluster :supervisors 4 :supervisor-slot-port-min 6710
+ :daemon-conf {STORM-LOCAL-MODE-ZMQ true
+ STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
+ STORM-MESSAGING-NETTY-MAX-RETRIES 10
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+ STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+ STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+ }]
+ (let [topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
+ {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
+ :parallelism-hint 6)})
+ results (complete-topology cluster
+ topology
+ ;; important for test that
+ ;; #tuples = multiple of 4 and 6
+ :storm-conf {TOPOLOGY-WORKERS 3}
+ :mock-sources {"1" [["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ]}
+ )]
+ (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
+ (read-tuples results "2"))))))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index 12ebe5d,0000000..20914ef
mode 100644,000000..100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@@ -1,97 -1,0 +1,112 @@@
++;; Licensed to the Apache Software Foundation (ASF) under one
++;; or more contributor license agreements. See the NOTICE file
++;; distributed with this work for additional information
++;; regarding copyright ownership. The ASF licenses this file
++;; to you under the Apache License, Version 2.0 (the
++;; "License"); you may not use this file except in compliance
++;; with the License. You may obtain a copy of the License at
++;;
++;; http:;; www.apache.org/licenses/LICENSE-2.0
++;;
++;; Unless required by applicable law or agreed to in writing, software
++;; distributed under the License is distributed on an "AS IS" BASIS,
++;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++;; See the License for the specific language governing permissions and
++;; limitations under the License.
+(ns backtype.storm.messaging.netty-unit-test
+ (:use [clojure test])
+ (:import [backtype.storm.messaging TransportFactory])
+ (:use [backtype.storm bootstrap testing util]))
+
+(bootstrap)
+
+(def port 6700)
+(def task 1)
+
+(deftest test-basic
+ (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
+ storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
+ STORM-MESSAGING-NETTY-MAX-RETRIES 10
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+ STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+ STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+ }
+ context (TransportFactory/makeContext storm-conf)
+ server (.bind context nil port)
+ client (.connect context nil "localhost" port)
+ _ (.send client task (.getBytes req_msg))
+ resp (.recv server 0)]
+ (is (= task (.task resp)))
+ (is (= req_msg (String. (.message resp))))
+ (.close client)
+ (.close server)
+ (.term context)))
+
+(deftest test-large-msg
+ (let [req_msg (apply str (repeat 2048000 'c'))
+ storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
+ STORM-MESSAGING-NETTY-MAX-RETRIES 10
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+ STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+ STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+ }
+ context (TransportFactory/makeContext storm-conf)
+ server (.bind context nil port)
+ client (.connect context nil "localhost" port)
+ _ (.send client task (.getBytes req_msg))
+ resp (.recv server 0)]
+ (is (= task (.task resp)))
+ (is (= req_msg (String. (.message resp))))
+ (.close client)
+ (.close server)
+ (.term context)))
+
+(deftest test-server-delayed
+ (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
+ storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
+ STORM-MESSAGING-NETTY-MAX-RETRIES 10
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+ STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+ STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+ }
+ context (TransportFactory/makeContext storm-conf)
+ client (.connect context nil "localhost" port)
+ _ (.send client task (.getBytes req_msg))
+ _ (Thread/sleep 1000)
+ server (.bind context nil port)
+ resp (.recv server 0)]
+ (is (= task (.task resp)))
+ (is (= req_msg (String. (.message resp))))
+ (.close client)
+ (.close server)
+ (.term context)))
+
+(deftest test-batch
+ (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
+ STORM-MESSAGING-NETTY-MAX-RETRIES 10
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+ STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+ STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+ }
+ context (TransportFactory/makeContext storm-conf)
+ server (.bind context nil port)
+ client (.connect context nil "localhost" port)]
+ (doseq [num (range 1 100000)]
+ (let [req_msg (str num)]
+ (.send client task (.getBytes req_msg))))
+ (doseq [num (range 1 100000)]
+ (let [req_msg (str num)
+ resp (.recv server 0)
+ resp_msg (String. (.message resp))]
+ (is (= req_msg resp_msg))))
+ (.close client)
+ (.close server)
+ (.term context)))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/test/clj/backtype/storm/messaging_test.clj
----------------------------------------------------------------------
[4/4] git commit: Merge branch 'netty-default'
Posted by pt...@apache.org.
Merge branch 'netty-default'
Conflicts:
storm-netty/project.clj
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/d79f6b80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/d79f6b80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/d79f6b80
Branch: refs/heads/master
Commit: d79f6b808cd294b15ec06e41f60b9dbbf1d3986b
Parents: d3414f5 962d520
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Dec 20 21:23:47 2013 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Dec 20 21:23:47 2013 -0500
----------------------------------------------------------------------
MODULES | 1 -
bin/install_zmq.sh | 48 ----
conf/defaults.yaml | 2 +-
storm-core/project.clj | 2 +-
.../src/clj/backtype/storm/messaging/zmq.clj | 108 ---------
storm-core/src/clj/backtype/storm/testing.clj | 2 +-
storm-core/src/clj/zilch/mq.clj | 119 ----------
.../backtype/storm/messaging/netty/Client.java | 221 ++++++++++++++++++
.../backtype/storm/messaging/netty/Context.java | 67 ++++++
.../storm/messaging/netty/ControlMessage.java | 67 ++++++
.../storm/messaging/netty/MessageBatch.java | 168 ++++++++++++++
.../storm/messaging/netty/MessageDecoder.java | 85 +++++++
.../storm/messaging/netty/MessageEncoder.java | 39 ++++
.../backtype/storm/messaging/netty/Server.java | 136 ++++++++++++
.../messaging/netty/StormClientHandler.java | 121 ++++++++++
.../netty/StormClientPipelineFactory.java | 44 ++++
.../messaging/netty/StormServerHandler.java | 70 ++++++
.../netty/StormServerPipelineFactory.java | 45 ++++
.../storm/messaging/netty_integration_test.clj | 59 +++++
.../storm/messaging/netty_unit_test.clj | 112 ++++++++++
.../test/clj/backtype/storm/messaging_test.clj | 2 +-
storm-core/test/clj/zilch/test/mq.clj | 101 ---------
storm-netty/project.clj | 28 ---
.../backtype/storm/messaging/netty/Client.java | 222 -------------------
.../backtype/storm/messaging/netty/Context.java | 67 ------
.../storm/messaging/netty/ControlMessage.java | 68 ------
.../storm/messaging/netty/MessageBatch.java | 170 --------------
.../storm/messaging/netty/MessageDecoder.java | 85 -------
.../storm/messaging/netty/MessageEncoder.java | 39 ----
.../backtype/storm/messaging/netty/Server.java | 137 ------------
.../messaging/netty/StormClientHandler.java | 128 -----------
.../netty/StormClientPipelineFactory.java | 44 ----
.../messaging/netty/StormServerHandler.java | 76 -------
.../netty/StormServerPipelineFactory.java | 45 ----
.../storm/messaging/netty_integration_test.clj | 59 -----
.../storm/messaging/netty_unit_test.clj | 112 ----------
36 files changed, 1238 insertions(+), 1661 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/d79f6b80/conf/defaults.yaml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/d79f6b80/storm-core/project.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/d79f6b80/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------