You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/02/25 23:09:56 UTC

[1/5] storm git commit: port STORM-329 fix to 0.9.x

Repository: storm
Updated Branches:
  refs/heads/0.9.3-branch a1e5893e1 -> 6b06d8468


port STORM-329 fix to 0.9.x


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/62788f29
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/62788f29
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/62788f29

Branch: refs/heads/0.9.3-branch
Commit: 62788f295bb1fb1cc83b99c30f82beb40eea5f25
Parents: a1e5893
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Feb 24 18:03:40 2015 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Feb 24 18:03:40 2015 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |   2 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |  45 +-
 .../src/clj/backtype/storm/messaging/local.clj  |   2 +-
 .../storm/messaging/ConnectionWithStatus.java   |  32 +
 .../backtype/storm/messaging/netty/Client.java  | 642 +++++++++++++------
 .../backtype/storm/messaging/netty/Server.java  | 122 +++-
 .../netty/StormClientPipelineFactory.java       |   2 +-
 .../storm/messaging/netty_unit_test.clj         |  70 +-
 .../test/clj/backtype/storm/worker_test.clj     |  38 ++
 9 files changed, 725 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/62788f29/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 0050227..e2b3300 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -109,7 +109,7 @@ zmq.hwm: 0
 storm.messaging.netty.server_worker_threads: 1
 storm.messaging.netty.client_worker_threads: 1
 storm.messaging.netty.buffer_size: 5242880 #5MB buffer
-# Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker.
+# Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker. The reconnection period need also be bigger than storm.zookeeper.session.timeout(default is 20s), so that we can abort the reconnection when the target worker is dead.
 storm.messaging.netty.max_retries: 300
 storm.messaging.netty.max_wait_ms: 1000
 storm.messaging.netty.min_wait_ms: 100

http://git-wip-us.apache.org/repos/asf/storm/blob/62788f29/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index f1c4d3a..d4c9467 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -21,7 +21,7 @@
   (:import [java.util ArrayList HashMap])
   (:import [backtype.storm.utils TransferDrainer])
   (:import [backtype.storm.messaging TransportFactory])
-  (:import [backtype.storm.messaging TaskMessage IContext IConnection])
+  (:import [backtype.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status])
   (:gen-class))
 
 (bootstrap)
@@ -201,6 +201,10 @@
       :worker-id worker-id
       :cluster-state cluster-state
       :storm-cluster-state storm-cluster-state
+      ;; when worker bootup, worker will start to setup initial connections to
+      ;; other workers. When all connection is ready, we will enable this flag
+      ;; and spout and bolt will be activated.
+      :worker-active-flag (atom false)
       :storm-active-atom (atom false)
       :executors executors
       :task-ids (->> receive-queue-map keys (map int) sort)
@@ -308,7 +312,7 @@
     (let [base (.storm-base (:storm-cluster-state worker) (:storm-id worker) callback)]
      (reset!
       (:storm-active-atom worker)
-      (= :active (-> base :status :type))
+       (and (= :active (-> base :status :type)) @(:worker-active-flag worker))
       ))
      ))
 
@@ -330,6 +334,37 @@
               (.send drainer node+port->socket)))
           (.clear drainer))))))
 
+;; Check whether this messaging connection is ready to send data
+(defn is-connection-ready [^IConnection connection]
+  (if (instance?  ConnectionWithStatus connection)
+    (let [^ConnectionWithStatus connection connection
+          status (.status connection)]
+      (= status ConnectionWithStatus$Status/Ready))
+    true))
+
+;; all connections are ready
+(defn all-connections-ready [worker]
+    (let [connections (vals @(:cached-node+port->socket worker))]
+      (every? is-connection-ready connections)))
+
+;; we will wait all connections to be ready and then activate the spout/bolt
+;; when the worker bootup
+(defn activate-worker-when-all-connections-ready
+  [worker]
+  (let [timer (:refresh-active-timer worker)
+        delay-secs 0
+        recur-secs 1]
+    (schedule timer
+      delay-secs
+      (fn this []
+        (if (all-connections-ready worker)
+          (do
+            (log-message "All connections are ready for worker " (:assignment-id worker) ":" (:port worker)
+              " with id "(:worker-id worker))
+            (reset! (:worker-active-flag worker) true))
+          (schedule timer recur-secs this :check-active false)
+            )))))
+
 (defn launch-receive-thread [worker]
   (log-message "Launching receive-thread for " (:assignment-id worker) ":" (:port worker))
   (msg-loader/launch-receive-thread!
@@ -373,14 +408,18 @@
         _ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
         _ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors))
 
+        receive-thread-shutdown (launch-receive-thread worker)
         
         refresh-connections (mk-refresh-connections worker)
 
         _ (refresh-connections nil)
+
+        _ (activate-worker-when-all-connections-ready worker)
+
         _ (refresh-storm-active worker nil)
+
  
         _ (reset! executors (dofor [e (:executors worker)] (executor/mk-executor worker e)))
-        receive-thread-shutdown (launch-receive-thread worker)
         
         transfer-tuples (mk-transfer-tuples-handler worker)
         

http://git-wip-us.apache.org/repos/asf/storm/blob/62788f29/storm-core/src/clj/backtype/storm/messaging/local.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/local.clj b/storm-core/src/clj/backtype/storm/messaging/local.clj
index 801f22d..4aa67ab 100644
--- a/storm-core/src/clj/backtype/storm/messaging/local.clj
+++ b/storm-core/src/clj/backtype/storm/messaging/local.clj
@@ -70,4 +70,4 @@
 (defn mk-context [] 
   (let [context  (LocalContext. nil nil)]
     (.prepare ^IContext context nil)
-    context))
\ No newline at end of file
+    context))

http://git-wip-us.apache.org/repos/asf/storm/blob/62788f29/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java b/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java
new file mode 100644
index 0000000..38abc19
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java
@@ -0,0 +1,32 @@
+package backtype.storm.messaging;
+
+public abstract class ConnectionWithStatus implements IConnection {
+
+  public static enum Status {
+
+    /**
+     * we are establishing a active connection with target host. The new data
+     * sending request can be buffered for future sending, or dropped(cases like
+     * there is no enough memory). It varies with difference IConnection
+     * implementations.
+     */
+    Connecting,
+
+    /**
+     * We have a alive connection channel, which can be used to transfer data.
+     */
+    Ready,
+
+    /**
+     * The connection channel is closed or being closed. We don't accept further
+     * data sending or receiving. All data sending request will be dropped.
+     */
+    Closed
+  };
+
+  /**
+   * whether this connection is available to transfer data
+   */
+  public abstract Status status();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/62788f29/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index c516b63..ae8a8d5 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -30,317 +30,601 @@ import org.jboss.netty.channel.ChannelFutureListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
+import java.util.HashMap;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-public class Client implements IConnection {
+import com.google.common.util.concurrent.*;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.messaging.ConnectionWithStatus;
+import backtype.storm.metric.api.IStatefulObject;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
+import backtype.storm.utils.Utils;
+
+/**
+ * A Netty client for sending task messages to a remote destination (Netty server).
+ *
+ * Implementation details:
+ *
+ * - Sending messages, i.e. writing to the channel, is performed asynchronously.
+ * - Messages are sent in batches to optimize for network throughput at the expense of network latency.  The message
+ *   batch size is configurable.
+ * - Connecting and reconnecting are performed asynchronously.
+ *     - Note: The current implementation drops any messages that are being enqueued for sending if the connection to
+ *       the remote destination is currently unavailable.
+ * - A background flusher thread is run in the background.  It will, at fixed intervals, check for any pending messages
+ *   (i.e. messages buffered in memory) and flush them to the remote destination iff background flushing is currently
+ *   enabled.
+ */
+public class Client extends ConnectionWithStatus implements IStatefulObject {
+
     private static final Logger LOG = LoggerFactory.getLogger(Client.class);
     private static final String PREFIX = "Netty-Client-";
-    private final int max_retries;
-    private final int base_sleep_ms;
-    private final int max_sleep_ms;
+    private static final long NO_DELAY_MS = 0L;
+    private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
+    private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
+    private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
+    private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;
+
     private final StormBoundedExponentialBackoffRetry retryPolicy;
-    private AtomicReference<Channel> channelRef;
     private final ClientBootstrap bootstrap;
-    private InetSocketAddress remote_addr;
+    private final InetSocketAddress dstAddress;
+    protected final String dstAddressPrefixedName;
+
+    /**
+     * The channel used for all write operations from this client to the remote destination.
+     */
+    private final AtomicReference<Channel> channelRef = new AtomicReference<Channel>(null);
+
+
+    /**
+     * Maximum number of reconnection attempts we will perform after a disconnect before giving up.
+     */
+    private final int maxReconnectionAttempts;
+
+    /**
+     * Total number of connection attempts.
+     */
+    private final AtomicInteger totalConnectionAttempts = new AtomicInteger(0);
+
+    /**
+     * Number of connection attempts since the last disconnect.
+     */
+    private final AtomicInteger connectionAttempts = new AtomicInteger(0);
+
+    /**
+     * Number of messages successfully sent to the remote destination.
+     */
+    private final AtomicInteger messagesSent = new AtomicInteger(0);
+
+    /**
+     * Number of messages that could not be sent to the remote destination.
+     */
+    private final AtomicInteger messagesLost = new AtomicInteger(0);
+
+    /**
+     * Number of messages buffered in memory.
+     */
+    private final AtomicLong pendingMessages = new AtomicLong(0);
+
+    /**
+     * This flag is set to true if and only if a client instance is being closed.
+     */
+    private volatile boolean closing = false;
+
+    /**
+     * When set to true, then the background flusher thread will flush any pending messages on its next run.
+     */
+    private final AtomicBoolean backgroundFlushingEnabled = new AtomicBoolean(false);
     
-    private final Random random = new Random();
-    private final ChannelFactory factory;
-    private final int buffer_size;
-    private boolean closing;
+    /**
+     * The absolute time (in ms) when the next background flush should be performed.
+     *
+     * Note: The flush operation will only be performed if backgroundFlushingEnabled is true, too.
+     */
+    private final AtomicLong nextBackgroundFlushTimeMs = new AtomicLong(DISTANT_FUTURE_TIME_MS);
 
-    private int messageBatchSize;
+    /**
+     * The time interval (in ms) at which the background flusher thread will be run to check for any pending messages
+     * to be flushed.
+     */
+    private final int flushCheckIntervalMs;
     
-    private AtomicLong pendings;
+    /**
+     * How many messages should be batched together before sending them to the remote destination.
+     *
+     * Messages are batched to optimize network throughput at the expense of latency.
+     */
+    private final int messageBatchSize;
 
-    MessageBatch messageBatch = null;
-    private AtomicLong flushCheckTimer;
-    private int flushCheckInterval;
-    private ScheduledExecutorService scheduler;
+    private MessageBatch messageBatch = null;
+    private final ListeningScheduledExecutorService scheduler;
+    protected final Map stormConf;
 
     @SuppressWarnings("rawtypes")
-    Client(Map storm_conf, ChannelFactory factory, 
-            ScheduledExecutorService scheduler, String host, int port) {
-        this.factory = factory;
-        this.scheduler = scheduler;
-        channelRef = new AtomicReference<Channel>(null);
+    Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port) {
         closing = false;
-        pendings = new AtomicLong(0);
-        flushCheckTimer = new AtomicLong(Long.MAX_VALUE);
-
-        // Configure
-        buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
-        max_retries = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
-        base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
-        max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
-        retryPolicy = new StormBoundedExponentialBackoffRetry(base_sleep_ms, max_sleep_ms, max_retries);
-
-        this.messageBatchSize = Utils.getInt(storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
+        this.stormConf = stormConf;
+        this.scheduler =  MoreExecutors.listeningDecorator(scheduler);
+        int bufferSize = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+        LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", host, port, bufferSize);
+        messageBatchSize = Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
+        flushCheckIntervalMs = Utils.getInt(stormConf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10);
+
+        maxReconnectionAttempts = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
+        int minWaitMs = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
+        int maxWaitMs = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
+        retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts);
+
+        // Initiate connection to remote destination
+        bootstrap = createClientBootstrap(factory, bufferSize);
+        dstAddress = new InetSocketAddress(host, port);
+        dstAddressPrefixedName = prefixedName(dstAddress);
+        connect(NO_DELAY_MS);
         
-        flushCheckInterval = Utils.getInt(storm_conf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10); // default 10 ms
-
-        LOG.info("New Netty Client, connect to " + host + ", " + port
-                + ", config: " + ", buffer_size: " + buffer_size);
+        // Launch background flushing thread
+        pauseBackgroundFlushing();
+        long initialDelayMs = Math.min(MINIMUM_INITIAL_DELAY_MS, maxWaitMs * maxReconnectionAttempts);
+        scheduler.scheduleWithFixedDelay(createBackgroundFlusher(), initialDelayMs, flushCheckIntervalMs,
+            TimeUnit.MILLISECONDS);
+    }
 
-        bootstrap = new ClientBootstrap(factory);
+    private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize) {
+        ClientBootstrap bootstrap = new ClientBootstrap(factory);
         bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setOption("sendBufferSize", buffer_size);
+        bootstrap.setOption("sendBufferSize", bufferSize);
         bootstrap.setOption("keepAlive", true);
-
-        // Set up the pipeline factory.
         bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
+        return bootstrap;
+    }
 
-        // Start the connection attempt.
-        remote_addr = new InetSocketAddress(host, port);
+    private String prefixedName(InetSocketAddress dstAddress) {
+        if (null != dstAddress) {
+            return PREFIX + dstAddress.toString();
+        }
+        return "";
+    }
         
-        // setup the connection asyncly now
-        scheduler.execute(new Runnable() {
+    private Runnable createBackgroundFlusher() {
+        return new Runnable() {
             @Override
             public void run() {   
-                connect();
+                if(!closing && backgroundFlushingEnabled.get() && nowMillis() > nextBackgroundFlushTimeMs.get()) {
+                    LOG.debug("flushing {} pending messages to {} in background", messageBatch.size(),
+                        dstAddressPrefixedName);
+                    flushPendingMessages();
+                }
+            }
+        };
+    }
+
+    private void pauseBackgroundFlushing() {
+        backgroundFlushingEnabled.set(false);
             }
-        });
         
-        Runnable flusher = new Runnable() {
-            @Override
-            public void run() {
+    private void resumeBackgroundFlushing() {
+        backgroundFlushingEnabled.set(true);
+    }
 
-                if(!closing) {
-                    long flushCheckTime = flushCheckTimer.get();
-                    long now = System.currentTimeMillis();
-                    if (now > flushCheckTime) {
+    private synchronized void flushPendingMessages() {
                         Channel channel = channelRef.get();
-                        if (null != channel && channel.isWritable()) {
-                            flush(channel);
+        if (containsMessages(messageBatch)) {
+            if (connectionEstablished(channel)) {
+                if (channel.isWritable()) {
+                    pauseBackgroundFlushing();
+                    MessageBatch toBeFlushed = messageBatch;
+                    flushMessages(channel, toBeFlushed);
+                    messageBatch = null;
+                }
+                else if (closing) {
+                    // Ensure background flushing is enabled so that we definitely have a chance to re-try the flush
+                    // operation in case the client is being gracefully closed (where we have a brief time window where
+                    // the client will wait for pending messages to be sent).
+                    resumeBackgroundFlushing();
+                }
                         }
+            else {
+                closeChannelAndReconnect(channel);
                     }
                 }
-                
             }
-        };
         
-        long initialDelay = Math.min(30L * 1000, max_sleep_ms * max_retries); //max wait for 30s
-        scheduler.scheduleWithFixedDelay(flusher, initialDelay, flushCheckInterval, TimeUnit.MILLISECONDS);
+    private long nowMillis() {
+        return System.currentTimeMillis();
     }
 
     /**
      * We will retry connection with exponential back-off policy
      */
-    private synchronized void connect() {
+    private synchronized void connect(long delayMs) {
         try {
+            if (closing) {
+                return;
+            }
 
-            Channel channel = channelRef.get();
-            if (channel != null && channel.isConnected()) {
+            if (connectionEstablished(channelRef.get())) {
                 return;
             }
 
-            int tried = 0;
-            //setting channel to null to make sure we throw an exception when reconnection fails
-            channel = null;
-            while (tried <= max_retries) {
+            connectionAttempts.getAndIncrement();
+            if (reconnectingAllowed()) {
+                totalConnectionAttempts.getAndIncrement();
+                LOG.info("connection attempt {} to {} scheduled to run in {} ms", connectionAttempts.get(),
+                    dstAddressPrefixedName, delayMs);
+                ListenableFuture<Channel> channelFuture = scheduler.schedule(
+                    new Connector(dstAddress, connectionAttempts.get()), delayMs, TimeUnit.MILLISECONDS);
+                Futures.addCallback(channelFuture, new FutureCallback<Channel>() {
+                    @Override public void onSuccess(Channel result) {
+                        if (connectionEstablished(result)) {
+                            setChannel(result);
+                            LOG.info("connection established to {}", dstAddressPrefixedName);
+                            connectionAttempts.set(0);
+                        }
+                        else {
+                            reconnectAgain(new RuntimeException("Returned channel was actually not established"));
+                        }
+                    }
 
-                LOG.info("Reconnect started for {}... [{}]", name(), tried);
-                LOG.debug("connection started...");
+                    @Override public void onFailure(Throwable t) {
+                        reconnectAgain(t);
+                    }
 
-                ChannelFuture future = bootstrap.connect(remote_addr);
-                future.awaitUninterruptibly();
-                Channel current = future.getChannel();
-                if (!future.isSuccess()) {
-                    if (null != current) {
-                        current.close();
+                    private void reconnectAgain(Throwable t) {
+                        String baseMsg = String.format("connection attempt %s to %s failed", connectionAttempts,
+                            dstAddressPrefixedName);
+                        String failureMsg = (t == null)? baseMsg : baseMsg + ": " + t.toString();
+                        LOG.error(failureMsg);
+                        long nextDelayMs = retryPolicy.getSleepTimeMs(connectionAttempts.get(), 0);
+                        connect(nextDelayMs);
                     }
-                } else {
-                    channel = current;
-                    break;
+                });
+            }
+            else {
+                close();
+                throw new RuntimeException("Giving up to connect to " + dstAddressPrefixedName + " after " +
+                    connectionAttempts + " failed attempts");
+            }
+                    }
+        catch (Exception e) {
+            throw new RuntimeException("Failed to connect to " + dstAddressPrefixedName, e);
                 }
-                Thread.sleep(retryPolicy.getSleepTimeMs(tried, 0));
-                tried++;  
             }
-            if (null != channel) {
-                LOG.info("connection established to a remote host " + name() + ", " + channel.toString());
+
+    private void setChannel(Channel channel) {
                 channelRef.set(channel);
-            } else {
-                close();
-                throw new RuntimeException("Remote address is not reachable. We will close this client " + name());
+    }
+
+    private boolean reconnectingAllowed() {
+        return !closing && connectionAttempts.get() <= (maxReconnectionAttempts + 1);
+    }
+
+    private boolean connectionEstablished(Channel channel) {
+        // Because we are using TCP (which is a connection-oriented transport unlike UDP), a connection is only fully
+        // established iff the channel is connected.  That is, a TCP-based channel must be in the CONNECTED state before
+        // anything can be read or written to the channel.
+        //
+        // See:
+        // - http://netty.io/3.9/api/org/jboss/netty/channel/ChannelEvent.html
+        // - http://stackoverflow.com/questions/13356622/what-are-the-netty-channel-state-transitions
+        return channel != null && channel.isConnected();
+    }
+
+    /**
+     * Note:  Storm will check via this method whether a worker can be activated safely during the initial startup of a
+     * topology.  The worker will only be activated once all of the its connections are ready.
+     */
+    @Override
+    public Status status() {
+        if (closing) {
+            return Status.Closed;
+        }
+        else if (!connectionEstablished(channelRef.get())) {
+            return Status.Connecting;
             }
-        } catch (InterruptedException e) {
-            throw new RuntimeException("connection failed " + name(), e);
+        else {
+            return Status.Ready;
         }
     }
 
     /**
-     * Enqueue task messages to be sent to server
+     * Receiving messages is not supported by a client.
+     *
+     * @throws java.lang.UnsupportedOperationException whenever this method is being called.
      */
-    synchronized public void send(Iterator<TaskMessage> msgs) {
+    @Override
+    public Iterator<TaskMessage> recv(int flags, int clientId) {
+        throw new UnsupportedOperationException("Client connection should not receive any messages");
+    }
+
+    @Override
+    public void send(int taskId, byte[] payload) {
+        TaskMessage msg = new TaskMessage(taskId, payload);
+        List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
+        wrapper.add(msg);
+        send(wrapper.iterator());
+    }
 
-        // throw exception if the client is being closed
+    /**
+     * Enqueue task messages to be sent to the remote destination (cf. `host` and `port`).
+     */
+    @Override
+    public synchronized void send(Iterator<TaskMessage> msgs) {
         if (closing) {
-            throw new RuntimeException("Client is being closed, and does not take requests any more");
+            int numMessages = iteratorSize(msgs);
+            LOG.warn("discarding {} messages because the Netty client to {} is being closed", numMessages,
+                dstAddressPrefixedName);
+            return;
         }
         
-        if (null == msgs || !msgs.hasNext()) {
+        if (!hasMessages(msgs)) {
             return;
         }
 
         Channel channel = channelRef.get();
-        if (null == channel) {
-            connect();
-            channel = channelRef.get();
+        if (!connectionEstablished(channel)) {
+            // Closing the channel and reconnecting should be done before handling the messages.
+            closeChannelAndReconnect(channel);
+            handleMessagesWhenConnectionIsUnavailable(msgs);
+            return;
         }
 
+        // Collect messages into batches (to optimize network throughput), then flush them.
         while (msgs.hasNext()) {
-            if (!channel.isConnected()) {
-                connect();
-                channel = channelRef.get();
-            }
             TaskMessage message = msgs.next();
-            if (null == messageBatch) {
+            if (messageBatch == null) {
                 messageBatch = new MessageBatch(messageBatchSize);
             }
 
             messageBatch.add(message);
             if (messageBatch.isFull()) {
                 MessageBatch toBeFlushed = messageBatch;
-                flushRequest(channel, toBeFlushed);
+                flushMessages(channel, toBeFlushed);
                 messageBatch = null;
             }
         }
 
-        if (null != messageBatch && !messageBatch.isEmpty()) {
-            if (channel.isWritable()) {
-                flushCheckTimer.set(Long.MAX_VALUE);
-                
-                // Flush as fast as we can to reduce the latency
+        // Handle any remaining messages in case the "last" batch was not full.
+        if (containsMessages(messageBatch)) {
+            if (connectionEstablished(channel) && channel.isWritable()) {
+                // We can write to the channel, so we flush the remaining messages immediately to minimize latency.
+                pauseBackgroundFlushing();
                 MessageBatch toBeFlushed = messageBatch;
                 messageBatch = null;
-                flushRequest(channel, toBeFlushed);
+                flushMessages(channel, toBeFlushed);
+            }
+            else {
+                // We cannot write to the channel, which means Netty's internal write buffer is full.
+                // In this case, we buffer the remaining messages and wait for the next messages to arrive.
+                //
+                // Background:
+                // Netty 3.x maintains an internal write buffer with a high water mark for each channel (default: 64K).
+                // This represents the amount of data waiting to be flushed to operating system buffers.  If the
+                // outstanding data exceeds this value then the channel is set to non-writable.  When this happens, a
+                // INTEREST_CHANGED channel event is triggered.  Netty sets the channel to writable again once the data
+                // has been flushed to the system buffers.
+                //
+                // See http://stackoverflow.com/questions/14049260
+                resumeBackgroundFlushing();
+                nextBackgroundFlushTimeMs.set(nowMillis() + flushCheckIntervalMs);
+            }
+        }
                 
-            } else {
-                // when channel is NOT writable, it means the internal netty buffer is full. 
-                // In this case, we can try to buffer up more incoming messages.
-                flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval);
             }
+
+    private boolean hasMessages(Iterator<TaskMessage> msgs) {
+        return msgs != null && msgs.hasNext();
         }
 
+    /**
+     * We will drop pending messages and let at-least-once message replay kick in.
+     *
+     * Another option would be to buffer the messages in memory.  But this option has the risk of causing OOM errors,
+     * especially for topologies that disable message acking because we don't know whether the connection recovery will
+     * succeed  or not, and how long the recovery will take.
+     */
+    private void handleMessagesWhenConnectionIsUnavailable(Iterator<TaskMessage> msgs) {
+        LOG.error("connection to {} is unavailable", dstAddressPrefixedName);
+        dropPendingMessages(msgs);
     }
 
-    public String name() {
-        if (null != remote_addr) {
-            return PREFIX + remote_addr.toString();
-        }
-        return "";
+    private void dropPendingMessages(Iterator<TaskMessage> msgs) {
+        // We consume the iterator by traversing and thus "emptying" it.
+        int msgCount = iteratorSize(msgs);
+        LOG.error("dropping {} pending message(s) destined for {}", msgCount, dstAddressPrefixedName);
     }
 
-    private synchronized void flush(Channel channel) {
-        if (!closing) {
-            if (null != messageBatch && !messageBatch.isEmpty()) {
-                MessageBatch toBeFlushed = messageBatch;
-                flushCheckTimer.set(Long.MAX_VALUE);
-                flushRequest(channel, toBeFlushed);
-                messageBatch = null;
+    private int iteratorSize(Iterator<TaskMessage> msgs) {
+        int size = 0;
+        if (msgs != null) {
+            while (msgs.hasNext()) {
+                size++;
+                msgs.next();
             }
         }
+        return size;
     }
     
     /**
-     * gracefully close this client.
+     * Asynchronously writes the message batch to the channel.
      * 
-     * We will send all existing requests, and then invoke close_n_release()
-     * method
+     * If the write operation fails, then we will close the channel and trigger a reconnect.
      */
-    public synchronized void close() {
-        if (!closing) {
-            closing = true;
-            LOG.info("Closing Netty Client " + name());
+    private synchronized void flushMessages(Channel channel, final MessageBatch batch) {
+        if (!containsMessages(batch)) {
+            return;
+        }
+
+        final int numMessages = batch.size();
+        pendingMessages.getAndAdd(numMessages);
+        LOG.debug("writing {} messages to channel {}", batch.size(), channel.toString());
+        ChannelFuture future = channel.write(batch);
+        future.addListener(new ChannelFutureListener() {
+
+            public void operationComplete(ChannelFuture future) throws Exception {
+                pendingMessages.getAndAdd(0 - numMessages);
+                if (future.isSuccess()) {
+                    LOG.debug("sent {} messages to {}", numMessages, dstAddressPrefixedName);
+                    messagesSent.getAndAdd(batch.size());
+                }
+                else {
+                    LOG.warn("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName,
+                        future.getCause());
+                    closeChannelAndReconnect(future.getChannel());
+                    messagesLost.getAndAdd(numMessages);
+                }
+            }
+
+        });
+    }
             
-            if (null != messageBatch && !messageBatch.isEmpty()) {
-                MessageBatch toBeFlushed = messageBatch;
-                Channel channel = channelRef.get();
+    private synchronized void closeChannelAndReconnect(Channel channel) {
                 if (channel != null) {
-                    flushRequest(channel, toBeFlushed);
+            channel.close();
+            if (channelRef.compareAndSet(channel, null)) {
+                connect(NO_DELAY_MS);
+            }
                 }
-                messageBatch = null;
             }
         
-            //wait for pendings to exit
-            final long timeoutMilliSeconds = 600 * 1000; //600 seconds
-            final long start = System.currentTimeMillis();
+    private boolean containsMessages(MessageBatch batch) {
+        return batch != null && !batch.isEmpty();
+    }
             
-            LOG.info("Waiting for pending batchs to be sent with "+ name() + "..., timeout: {}ms, pendings: {}", timeoutMilliSeconds, pendings.get());
+    /**
+     * Gracefully close this client.
+     *
+     * We will attempt to send any pending messages (i.e. messages currently buffered in memory) before closing the
+     * client.
+     */
+    @Override
+    public void close() {
+        if (!closing) {
+            LOG.info("closing Netty Client {}", dstAddressPrefixedName);
+            // Set closing to true to prevent any further reconnection attempts.
+            closing = true;
+            flushPendingMessages();
+            waitForPendingMessagesToBeSent();
+            closeChannel();
+        }
+    }
             
-            while(pendings.get() != 0) {
+    private synchronized void waitForPendingMessagesToBeSent() {
+        LOG.info("waiting up to {} ms to send {} pending messages to {}",
+            PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), dstAddressPrefixedName);
+        long totalPendingMsgs = pendingMessages.get();
+        long startMs = nowMillis();
+        while (pendingMessages.get() != 0) {
                 try {
-                    long delta = System.currentTimeMillis() - start;
-                    if (delta > timeoutMilliSeconds) {
-                        LOG.error("Timeout when sending pending batchs with {}..., there are still {} pending batchs not sent", name(), pendings.get());
+                long deltaMs = nowMillis() - startMs;
+                if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
+                    LOG.error("failed to send all pending messages to {} within timeout, {} of {} messages were not " +
+                        "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs);
                         break;
+                }
+                Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
                     }
-                    Thread.sleep(1000); //sleep 1s
-                } catch (InterruptedException e) {
+            catch (InterruptedException e) {
                     break;
                 } 
             }
             
-            close_n_release();
-        }
     }
 
-    /**
-     * close_n_release() is invoked after all messages have been sent.
-     */
-    private void close_n_release() {
+    private synchronized void closeChannel() {
         if (channelRef.get() != null) {
             channelRef.get().close();
-            LOG.debug("channel {} closed",remote_addr);
+            LOG.debug("channel to {} closed", dstAddressPrefixedName);
         }
     }
 
     @Override
-    public Iterator<TaskMessage> recv(int flags, int clientId) {
-        throw new RuntimeException("Client connection should not receive any messages");
+    public Object getState() {
+        LOG.info("Getting metrics for client connection to {}", dstAddressPrefixedName);
+        HashMap<String, Object> ret = new HashMap<String, Object>();
+        ret.put("reconnects", totalConnectionAttempts.getAndSet(0));
+        ret.put("sent", messagesSent.getAndSet(0));
+        ret.put("pending", pendingMessages.get());
+        ret.put("lostOnSend", messagesLost.getAndSet(0));
+        ret.put("dest", dstAddress.toString());
+        String src = srcAddressName();
+        if (src != null) {
+            ret.put("src", src);
+        }
+        return ret;
     }
 
-    @Override
-    public void send(int taskId, byte[] payload) {
-        TaskMessage msg = new TaskMessage(taskId, payload);
-        List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
-        wrapper.add(msg);
-        send(wrapper.iterator());
+    private String srcAddressName() {
+        String name = null;
+        Channel c = channelRef.get();
+        if (c != null) {
+            SocketAddress address = c.getLocalAddress();
+            if (address != null) {
+                name = address.toString();
+            }
+        }
+        return name;
     }
 
-    private void flushRequest(Channel channel, final MessageBatch requests) {
-        if (requests == null)
-            return;
+    @Override public String toString() {
+        return String.format("Netty client for connecting to %s", dstAddressPrefixedName);
+    }
 
-        pendings.incrementAndGet();
-        ChannelFuture future = channel.write(requests);
-        future.addListener(new ChannelFutureListener() {
-            public void operationComplete(ChannelFuture future)
-                    throws Exception {
+    /**
+     * Asynchronously establishes a Netty connection to the remote address, returning a Netty Channel on success.
+     */
+    private class Connector implements Callable<Channel> {
 
-                pendings.decrementAndGet();
-                if (!future.isSuccess()) {
-                    LOG.info(
-                            "failed to send requests to " + remote_addr.toString() + ": ", future.getCause());
+        private final InetSocketAddress address;
+        private final int connectionAttempt;
 
-                    Channel channel = future.getChannel();
+        public Connector(InetSocketAddress address, int connectionAttempt) {
+            this.address = address;
+            if (connectionAttempt < 1) {
+                throw new IllegalArgumentException("connection attempt must be >= 1 (you provided " +
+                    connectionAttempt + ")");
+            }
+            this.connectionAttempt = connectionAttempt;
+        }
 
-                    if (null != channel) {
-                        channel.close();
-                        channelRef.compareAndSet(channel, null);
+        @Override public Channel call() throws Exception {
+            LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt);
+            Channel channel = null;
+            ChannelFuture future = bootstrap.connect(address);
+            future.awaitUninterruptibly();
+            Channel current = future.getChannel();
+
+            if (future.isSuccess() && connectionEstablished(current)) {
+                channel = current;
+                LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(), channel.toString(),
+                    connectionAttempt);
                     }
-                } else {
-                    LOG.debug("{} request(s) sent", requests.size());
+            else {
+                LOG.debug("failed to connect to {} [attempt {}]", address.toString(), connectionAttempt);
+                if (current != null) {
+                    current.close();
                 }
             }
-        });
+            return channel;
     }
 }
 
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/62788f29/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index 2499e65..5598a39 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -37,15 +37,37 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.messaging.ConnectionWithStatus;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.metric.api.IStatefulObject;
+import backtype.storm.utils.Utils;
+
+class Server extends ConnectionWithStatus implements IStatefulObject {
 
-class Server implements IConnection {
     private static final Logger LOG = LoggerFactory.getLogger(Server.class);
     @SuppressWarnings("rawtypes")
     Map storm_conf;
     int port;
+    private final ConcurrentHashMap<String, AtomicInteger> messagesEnqueued = new ConcurrentHashMap<String, AtomicInteger>();
+    private final AtomicInteger messagesDequeued = new AtomicInteger(0);
+    private final AtomicInteger[] pendingMessages;
     
     // Create multiple queues for incoming messages. The size equals the number of receiver threads.
     // For message which is sent to same task, it will be stored in the same queue to preserve the message order.
@@ -59,7 +81,7 @@ class Server implements IConnection {
     private volatile HashMap<Integer, Integer> taskToQueueId = null;
     int roundRobinQueueId;
 	
-    boolean closing = false;
+    private volatile boolean closing = false;
     List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
     
     
@@ -73,8 +95,10 @@ class Server implements IConnection {
         taskToQueueId = new HashMap<Integer, Integer>();
     
         message_queue = new LinkedBlockingQueue[queueCount];
+        pendingMessages = new AtomicInteger[queueCount];
         for (int i = 0; i < queueCount; i++) {
             message_queue[i] = new LinkedBlockingQueue<ArrayList<TaskMessage>>();
+            pendingMessages[i] = new AtomicInteger(0);
         }
         
         // Configure the server.
@@ -149,13 +173,30 @@ class Server implements IConnection {
       return queueId;
     }
 
+    private void addReceiveCount(String from, int amount) {
+        //This is possibly lossy in the case where a value is deleted
+        // because it has received no messages over the metrics collection
+        // period and new messages are starting to come in.  This is
+        // because I don't want the overhead of a synchronize just to have
+        // the metric be absolutely perfect.
+        AtomicInteger i = messagesEnqueued.get(from);
+        if (i == null) {
+            i = new AtomicInteger(amount);
+            AtomicInteger prev = messagesEnqueued.putIfAbsent(from, i);
+            if (prev != null) {
+                prev.addAndGet(amount);
+            }
+        } else {
+            i.addAndGet(amount);
+        }
+    }
+
+
     /**
      * enqueue a received message 
-     * @param message
      * @throws InterruptedException
      */
     protected void enqueue(List<TaskMessage> msgs) throws InterruptedException {
-      
       if (null == msgs || msgs.size() == 0 || closing) {
         return;
       }
@@ -170,11 +211,12 @@ class Server implements IConnection {
         ArrayList<TaskMessage> msgGroup = messageGroups[receiverId];
         if (null != msgGroup) {
           message_queue[receiverId].put(msgGroup);
+                pendingMessages[receiverId].addAndGet(msgGroup.size());
         }
       }
     }
     
-    public Iterator<TaskMessage> recv(int flags, int receiverId)  {
+    public Iterator<TaskMessage> recv(int flags, int receiverId) {
       if (closing) {
         return closeMessage.iterator();
       }
@@ -184,18 +226,22 @@ class Server implements IConnection {
       if ((flags & 0x01) == 0x01) { 
             //non-blocking
             ret = message_queue[queueId].poll();
-        } else {
+        }
+        else {
             try {
                 ArrayList<TaskMessage> request = message_queue[queueId].take();
                 LOG.debug("request to be processed: {}", request);
                 ret = request;
-            } catch (InterruptedException e) {
+            }
+            catch (InterruptedException e) {
                 LOG.info("exception within msg receiving", e);
                 ret = null;
             }
         }
       
       if (null != ret) {
+            messagesDequeued.addAndGet(ret.size());
+            pendingMessages[queueId].addAndGet(0 - ret.size());
         return ret.iterator();
       }
       return null;
@@ -230,14 +276,72 @@ class Server implements IConnection {
     }
 
     public void send(int task, byte[] message) {
-        throw new RuntimeException("Server connection should not send any messages");
+        throw new UnsupportedOperationException("Server connection should not send any messages");
     }
     
     public void send(Iterator<TaskMessage> msgs) {
-      throw new RuntimeException("Server connection should not send any messages");
+      throw new UnsupportedOperationException("Server connection should not send any messages");
     }
 	
     public String name() {
       return "Netty-server-localhost-" + port;
     }
+
+    @Override
+    public Status status() {
+        if (closing) {
+          return Status.Closed;
+        }
+        else if (!connectionEstablished(allChannels)) {
+            return Status.Connecting;
+        }
+        else {
+            return Status.Ready;
+        }
+    }
+
+    private boolean connectionEstablished(Channel channel) {
+      return channel != null && channel.isBound();
+    }
+
+    private boolean connectionEstablished(ChannelGroup allChannels) {
+        boolean allEstablished = true;
+        for (Channel channel : allChannels) {
+            if (!(connectionEstablished(channel))) {
+                allEstablished = false;
+                break;
+            }
+        }
+        return allEstablished;
+    }
+
+    public Object getState() {
+        LOG.info("Getting metrics for server on port {}", port);
+        HashMap<String, Object> ret = new HashMap<String, Object>();
+        ret.put("dequeuedMessages", messagesDequeued.getAndSet(0));
+        ArrayList<Integer> pending = new ArrayList<Integer>(pendingMessages.length);
+        for (AtomicInteger p: pendingMessages) {
+            pending.add(p.get());
+        }
+        ret.put("pending", pending);
+        HashMap<String, Integer> enqueued = new HashMap<String, Integer>();
+        Iterator<Map.Entry<String, AtomicInteger>> it = messagesEnqueued.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<String, AtomicInteger> ent = it.next();
+            //Yes we can delete something that is not 0 because of races, but that is OK for metrics
+            AtomicInteger i = ent.getValue();
+            if (i.get() == 0) {
+                it.remove();
+            } else {
+                enqueued.put(ent.getKey(), i.getAndSet(0));
+            }
+        }
+        ret.put("enqueued", enqueued);
+        return ret;
+    }
+
+    @Override public String toString() {
+       return String.format("Netty server listening on port %s", port);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/62788f29/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
index e6e8b3d..73c50a1 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@ -37,7 +37,7 @@ class StormClientPipelineFactory implements ChannelPipelineFactory {
         // Encoder
         pipeline.addLast("encoder", new MessageEncoder());
         // business logic.
-        pipeline.addLast("handler", new StormClientErrorHandler(client.name()));
+        pipeline.addLast("handler", new StormClientErrorHandler(client.dstAddressPrefixedName));
 
         return pipeline;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/62788f29/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index ed5797d..04f2a4b 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -16,14 +16,36 @@
 (ns backtype.storm.messaging.netty-unit-test
   (:use [clojure test])
   (:import [backtype.storm.messaging TransportFactory])
-  (:use [backtype.storm bootstrap testing util]))
+  (:use [backtype.storm bootstrap testing util])
+  (:use [backtype.storm.daemon.worker :only [is-connection-ready]]))
 
 (bootstrap)
 
 (def port 6700)
 (def task 1)
 
+;; In a "real" cluster (or an integration test), Storm itself would ensure that a topology's workers would only be
+;; activated once all the workers' connections are ready.  The tests in this file however launch Netty servers and
+;; clients directly, and thus we must ensure manually that the server and the client connections are ready before we
+;; commence testing.  If we don't do this, then we will lose the first messages being sent between the client and the
+;; server, which will fail the tests.
+(defn- wait-until-ready
+  ([connections]
+      (do (log-message "Waiting until all Netty connections are ready...")
+          (wait-until-ready connections 0)))
+  ([connections waited-ms]
+    (let [interval-ms 10
+          max-wait-ms 5000]
+      (if-not (every? is-connection-ready connections)
+        (if (<= waited-ms max-wait-ms)
+          (do
+            (Thread/sleep interval-ms)
+            (wait-until-ready connections (+ waited-ms interval-ms)))
+          (throw (RuntimeException. (str "Netty connections were not ready within " max-wait-ms " ms"))))
+        (log-message "All Netty connections are ready")))))
+
 (deftest test-basic
+  (log-message "Should send and receive a basic message")
   (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
         storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
                     STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
@@ -36,6 +58,7 @@
         context (TransportFactory/makeContext storm-conf)
         server (.bind context nil port)
         client (.connect context nil "localhost" port)
+        _ (wait-until-ready [server client])
         _ (.send client task (.getBytes req_msg))
         iter (.recv server 0 0)
         resp (.next iter)]
@@ -46,6 +69,7 @@
     (.term context)))
 
 (deftest test-large-msg
+  (log-message "Should send and receive a large message")
   (let [req_msg (apply str (repeat 2048000 'c'))
         storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
                     STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
@@ -58,6 +82,7 @@
         context (TransportFactory/makeContext storm-conf)
         server (.bind context nil port)
         client (.connect context nil "localhost" port)
+        _ (wait-until-ready [server client])
         _ (.send client task (.getBytes req_msg))
         iter (.recv server 0 0)
         resp (.next iter)]
@@ -67,38 +92,9 @@
     (.close server)
     (.term context)))
 
-(deftest test-server-delayed
-    (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
-       storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
-                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
-                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
-                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
-                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
-                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
-                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
-                    }
-        context (TransportFactory/makeContext storm-conf)
-        client (.connect context nil "localhost" port)
-
-        server (Thread.
-                (fn []
-                  (Thread/sleep 1000)
-                  (let [server (.bind context nil port)
-                        iter (.recv server 0 0)
-                        resp (.next iter)]
-                    (is (= task (.task resp)))
-                    (is (= req_msg (String. (.message resp))))
-                    (.close server)
-                  )))
-        _ (.start server)
-        _ (.send client task (.getBytes req_msg))
-        ]
-    (.close client)
-    (.join server)
-    (.term context)))
-
 (deftest test-batch
-  (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+  (let [num-messages 100000
+        storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
                     STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
                     STORM-MESSAGING-NETTY-MAX-RETRIES 10
                     STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
@@ -106,23 +102,25 @@
                     STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
                     STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
                     }
+        _ (log-message "Should send and receive many messages (testing with " num-messages " messages)")
         context (TransportFactory/makeContext storm-conf)
         server (.bind context nil port)
-        client (.connect context nil "localhost" port)]
-    (doseq [num  (range 1 100000)]
+        client (.connect context nil "localhost" port)
+        _ (wait-until-ready [server client])]
+    (doseq [num  (range 1 num-messages)]
       (let [req_msg (str num)]
         (.send client task (.getBytes req_msg))))
 
     (let [resp (ArrayList.)
           received (atom 0)]
-      (while (< @received (- 100000 1))
+      (while (< @received (- num-messages 1))
         (let [iter (.recv server 0 0)]
           (while (.hasNext iter)
             (let [msg (.next iter)]
               (.add resp msg)
               (swap! received inc)
               ))))
-      (doseq [num  (range 1 100000)]
+      (doseq [num  (range 1 num-messages)]
       (let [req_msg (str num)
             resp_msg (String. (.message (.get resp (- num 1))))]
         (is (= req_msg resp_msg)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/62788f29/storm-core/test/clj/backtype/storm/worker_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/worker_test.clj b/storm-core/test/clj/backtype/storm/worker_test.clj
new file mode 100644
index 0000000..f09baef
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/worker_test.clj
@@ -0,0 +1,38 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.worker-test
+  (:use [clojure test])
+  (:import [backtype.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status])
+  (:import [org.mockito Mockito])
+  (:use [backtype.storm bootstrap testing])
+  (:use [backtype.storm.daemon common])
+
+  (:require [backtype.storm.daemon [worker :as worker]])
+  )
+
+(bootstrap)
+
+(deftest test-worker-is-connection-ready
+  (let [connection (Mockito/mock ConnectionWithStatus)]
+    (. (Mockito/when (.status connection)) thenReturn ConnectionWithStatus$Status/Ready)
+    (is (= true (worker/is-connection-ready connection)))
+
+    (. (Mockito/when (.status connection)) thenReturn ConnectionWithStatus$Status/Connecting)
+    (is (= false (worker/is-connection-ready connection)))
+
+    (. (Mockito/when (.status connection)) thenReturn ConnectionWithStatus$Status/Closed)
+    (is (= false (worker/is-connection-ready connection)))
+  ))
\ No newline at end of file


[2/5] storm git commit: Clarify name of method for dropping messages

Posted by pt...@apache.org.
Clarify name of method for dropping messages

Signed-off-by: P. Taylor Goetz <pt...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/97a76fc8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/97a76fc8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/97a76fc8

Branch: refs/heads/0.9.3-branch
Commit: 97a76fc896de508f015dbe32f1473ddbf10d736b
Parents: 81016c2
Author: Michael G. Noll <mn...@verisign.com>
Authored: Mon Feb 16 10:03:07 2015 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Feb 24 18:09:56 2015 -0500

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/messaging/netty/Client.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/97a76fc8/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index f275ef6..77c20c8 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -443,14 +443,14 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
      */
     private void handleMessagesWhenConnectionIsUnavailable(Iterator<TaskMessage> msgs) {
         LOG.error("connection to {} is unavailable", dstAddressPrefixedName);
-        dropPendingMessages(msgs);
+        dropMessages(msgs);
     }
 
-    private void dropPendingMessages(Iterator<TaskMessage> msgs) {
+    private void dropMessages(Iterator<TaskMessage> msgs) {
         // We consume the iterator by traversing and thus "emptying" it.
         int msgCount = iteratorSize(msgs);
         messagesLost.getAndAdd(msgCount);
-        LOG.error("dropping {} pending message(s) destined for {}", msgCount, dstAddressPrefixedName);
+        LOG.error("dropping {} message(s) destined for {}", msgCount, dstAddressPrefixedName);
     }
 
     private int iteratorSize(Iterator<TaskMessage> msgs) {


[3/5] storm git commit: Track how many messages are being dropped when a connection is unavailable

Posted by pt...@apache.org.
Track how many messages are being dropped when a connection is unavailable

Signed-off-by: P. Taylor Goetz <pt...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/81016c2e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/81016c2e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/81016c2e

Branch: refs/heads/0.9.3-branch
Commit: 81016c2ed7222da99138bc9971e335533d4cb518
Parents: 62788f2
Author: Michael G. Noll <mn...@verisign.com>
Authored: Mon Feb 16 10:01:27 2015 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Feb 24 18:09:56 2015 -0500

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/messaging/netty/Client.java | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/81016c2e/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index ae8a8d5..f275ef6 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -449,6 +449,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
     private void dropPendingMessages(Iterator<TaskMessage> msgs) {
         // We consume the iterator by traversing and thus "emptying" it.
         int msgCount = iteratorSize(msgs);
+        messagesLost.getAndAdd(msgCount);
         LOG.error("dropping {} pending message(s) destined for {}", msgCount, dstAddressPrefixedName);
     }
 


[5/5] storm git commit: add STORM-329 to changelog

Posted by pt...@apache.org.
add STORM-329 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6b06d846
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6b06d846
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6b06d846

Branch: refs/heads/0.9.3-branch
Commit: 6b06d8468ff5e743fb12b85dd84fe0931041c2c3
Parents: 9138d9f
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Feb 24 18:18:43 2015 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Feb 24 18:18:43 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6b06d846/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5839b7a..7dd058d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.9.4
+ * STORM-329: fix cascading Storm failure by improving reconnection strategy and buffering messages
  * STORM-130: Supervisor getting killed due to java.io.FileNotFoundException: File '../stormconf.ser' does not exist.
 
 ## 0.9.3-rc2


[4/5] storm git commit: Change log level for intentionally dropping messages from WARN to ERROR

Posted by pt...@apache.org.
Change log level for intentionally dropping messages from WARN to ERROR

This change makes the log level for dropping messages consistent in
Client.java.

Signed-off-by: P. Taylor Goetz <pt...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9138d9fc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9138d9fc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9138d9fc

Branch: refs/heads/0.9.3-branch
Commit: 9138d9fc255639b4d0d43657379ce467591e8ef2
Parents: 97a76fc
Author: Michael G. Noll <mn...@verisign.com>
Authored: Mon Feb 16 10:07:35 2015 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Feb 24 18:09:56 2015 -0500

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/messaging/netty/Client.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9138d9fc/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 77c20c8..afce496 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -370,7 +370,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
     public synchronized void send(Iterator<TaskMessage> msgs) {
         if (closing) {
             int numMessages = iteratorSize(msgs);
-            LOG.warn("discarding {} messages because the Netty client to {} is being closed", numMessages,
+            LOG.error("discarding {} messages because the Netty client to {} is being closed", numMessages,
                 dstAddressPrefixedName);
             return;
         }
@@ -487,7 +487,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
                     messagesSent.getAndAdd(batch.size());
                 }
                 else {
-                    LOG.warn("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName,
+                    LOG.error("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName,
                         future.getCause());
                     closeChannelAndReconnect(future.getChannel());
                     messagesLost.getAndAdd(numMessages);