You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/02/23 22:11:28 UTC

[1/6] storm git commit: STORM-329: fix cascading Storm failure by improving reconnection strategy and buffering messages

Repository: storm
Updated Branches:
  refs/heads/master e6a29e07c -> d7334849b


STORM-329: fix cascading Storm failure by improving reconnection strategy and buffering messages

Thanks to @tedxia for the initial work on this patch, which covered a
lot if not most of the work!


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/205eaf4e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/205eaf4e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/205eaf4e

Branch: refs/heads/master
Commit: 205eaf4ebe28ab5550a842ea9aabd23b41678743
Parents: 8036109
Author: Michael G. Noll <mn...@verisign.com>
Authored: Wed Feb 11 19:55:53 2015 +0100
Committer: Michael G. Noll <mn...@verisign.com>
Committed: Wed Feb 11 19:55:53 2015 +0100

----------------------------------------------------------------------
 conf/defaults.yaml                              |   2 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |  52 +-
 .../src/clj/backtype/storm/messaging/local.clj  |   2 +-
 .../storm/messaging/ConnectionWithStatus.java   |  32 +
 .../backtype/storm/messaging/netty/Client.java  | 711 ++++++++++++-------
 .../messaging/netty/SaslStormClientHandler.java |   5 +-
 .../backtype/storm/messaging/netty/Server.java  | 182 +++--
 .../netty/StormClientPipelineFactory.java       |   5 +-
 .../storm/messaging/netty_unit_test.clj         |  71 +-
 .../test/clj/backtype/storm/worker_test.clj     |  38 +
 10 files changed, 736 insertions(+), 364 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 35d20ff..bc7cb65 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -146,7 +146,7 @@ zmq.hwm: 0
 storm.messaging.netty.server_worker_threads: 1
 storm.messaging.netty.client_worker_threads: 1
 storm.messaging.netty.buffer_size: 5242880 #5MB buffer
-# Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker.
+# Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker. The reconnection period need also be bigger than storm.zookeeper.session.timeout(default is 20s), so that we can abort the reconnection when the target worker is dead.
 storm.messaging.netty.max_retries: 300
 storm.messaging.netty.max_wait_ms: 1000
 storm.messaging.netty.min_wait_ms: 100

http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index 8bba5e4..15c6143 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -21,7 +21,7 @@
   (:import [java.util ArrayList HashMap])
   (:import [backtype.storm.utils TransferDrainer])
   (:import [backtype.storm.messaging TransportFactory])
-  (:import [backtype.storm.messaging TaskMessage IContext IConnection])
+  (:import [backtype.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status])
   (:import [backtype.storm.security.auth AuthUtils])
   (:import [javax.security.auth Subject])
   (:import [java.security PrivilegedExceptionAction])
@@ -217,6 +217,10 @@
       :worker-id worker-id
       :cluster-state cluster-state
       :storm-cluster-state storm-cluster-state
+      ;; when worker bootup, worker will start to setup initial connections to
+      ;; other workers. When all connection is ready, we will enable this flag
+      ;; and spout and bolt will be activated.
+      :worker-active-flag (atom false)
       :storm-active-atom (atom false)
       :executors executors
       :task-ids (->> receive-queue-map keys (map int) sort)
@@ -321,7 +325,7 @@
     (let [base (.storm-base (:storm-cluster-state worker) (:storm-id worker) callback)]
      (reset!
       (:storm-active-atom worker)
-      (= :active (-> base :status :type))
+       (and (= :active (-> base :status :type)) @(:worker-active-flag worker))
       ))
      ))
 
@@ -343,6 +347,37 @@
               (.send drainer node+port->socket)))
           (.clear drainer))))))
 
+;; Check whether this messaging connection is ready to send data
+(defn is-connection-ready [^IConnection connection]
+  (if (instance?  ConnectionWithStatus connection)
+    (let [^ConnectionWithStatus connection connection
+          status (.status connection)]
+      (= status ConnectionWithStatus$Status/Ready))
+    true))
+
+;; all connections are ready
+(defn all-connections-ready [worker]
+    (let [connections (vals @(:cached-node+port->socket worker))]
+      (every? is-connection-ready connections)))
+
+;; we will wait all connections to be ready and then activate the spout/bolt
+;; when the worker bootup
+(defn activate-worker-when-all-connections-ready
+  [worker]
+  (let [timer (:refresh-active-timer worker)
+        delay-secs 0
+        recur-secs 1]
+    (schedule timer
+      delay-secs
+      (fn this []
+        (if (all-connections-ready worker)
+          (do
+            (log-message "All connections are ready for worker " (:assignment-id worker) ":" (:port worker)
+              " with id "(:worker-id worker))
+            (reset! (:worker-active-flag worker) true))
+          (schedule timer recur-secs this :check-active false)
+            )))))
+
 (defn launch-receive-thread [worker]
   (log-message "Launching receive-thread for " (:assignment-id worker) ":" (:port worker))
   (msg-loader/launch-receive-thread!
@@ -395,21 +430,26 @@
         ;; do this here so that the worker process dies if this fails
         ;; it's important that worker heartbeat to supervisor ASAP when launching so that the supervisor knows it's running (and can move on)
         _ (heartbeat-fn)
- 
+
         executors (atom nil)
         ;; launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
         ;; to the supervisor
         _ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
         _ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors))
 
+        receive-thread-shutdown (launch-receive-thread worker)
+
         refresh-connections (mk-refresh-connections worker)
 
         _ (refresh-connections nil)
+
+        _ (activate-worker-when-all-connections-ready worker)
+
         _ (refresh-storm-active worker nil)
- 
+
+
         _ (reset! executors (dofor [e (:executors worker)] (executor/mk-executor worker e initial-credentials)))
-        receive-thread-shutdown (launch-receive-thread worker)
-        
+
         transfer-tuples (mk-transfer-tuples-handler worker)
         
         transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples)                                       

http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/storm-core/src/clj/backtype/storm/messaging/local.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/local.clj b/storm-core/src/clj/backtype/storm/messaging/local.clj
index 801f22d..4aa67ab 100644
--- a/storm-core/src/clj/backtype/storm/messaging/local.clj
+++ b/storm-core/src/clj/backtype/storm/messaging/local.clj
@@ -70,4 +70,4 @@
 (defn mk-context [] 
   (let [context  (LocalContext. nil nil)]
     (.prepare ^IContext context nil)
-    context))
\ No newline at end of file
+    context))

http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java b/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java
new file mode 100644
index 0000000..38abc19
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java
@@ -0,0 +1,32 @@
+package backtype.storm.messaging;
+
+public abstract class ConnectionWithStatus implements IConnection {
+
+  public static enum Status {
+
+    /**
+     * we are establishing a active connection with target host. The new data
+     * sending request can be buffered for future sending, or dropped(cases like
+     * there is no enough memory). It varies with difference IConnection
+     * implementations.
+     */
+    Connecting,
+
+    /**
+     * We have a alive connection channel, which can be used to transfer data.
+     */
+    Ready,
+
+    /**
+     * The connection channel is closed or being closed. We don't accept further
+     * data sending or receiving. All data sending request will be dropped.
+     */
+    Closed
+  };
+
+  /**
+   * whether this connection is available to transfer data
+   */
+  public abstract Status status();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index d770481..5d99718 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -24,15 +24,15 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.Random;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.util.concurrent.*;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
@@ -42,344 +42,577 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import backtype.storm.Config;
+import backtype.storm.messaging.ConnectionWithStatus;
 import backtype.storm.metric.api.IStatefulObject;
-import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.TaskMessage;
 import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
 import backtype.storm.utils.Utils;
 
-public class Client implements IConnection, IStatefulObject{
+/**
+ * A Netty client for sending task messages to a remote destination (Netty server).
+ *
+ * Implementation details:
+ *
+ * - Sending messages, i.e. writing to the channel, is performed asynchronously.
+ * - Messages are sent in batches to optimize for network throughput at the expense of network latency.  The message
+ *   batch size is configurable.
+ * - Connecting and reconnecting are performed asynchronously.
+ *     - Note: The current implementation drops any messages that are being enqueued for sending if the connection to
+ *       the remote destination is currently unavailable.
+ * - A background flusher thread is run in the background.  It will, at fixed intervals, check for any pending messages
+ *   (i.e. messages buffered in memory) and flush them to the remote destination iff background flushing is currently
+ *   enabled.
+ */
+public class Client extends ConnectionWithStatus implements IStatefulObject {
+
     private static final Logger LOG = LoggerFactory.getLogger(Client.class);
     private static final String PREFIX = "Netty-Client-";
-    private final int max_retries;
-    private final int base_sleep_ms;
-    private final int max_sleep_ms;
+    private static final long NO_DELAY_MS = 0L;
+    private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
+    private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
+    private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
+    private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;
+
     private final StormBoundedExponentialBackoffRetry retryPolicy;
-    private AtomicReference<Channel> channelRef;
     private final ClientBootstrap bootstrap;
-    private InetSocketAddress remote_addr;
-    
-    private AtomicInteger totalReconnects;
-    private AtomicInteger messagesSent;
-    private AtomicInteger messagesLostReconnect;
-    private final Random random = new Random();
-    private final ChannelFactory factory;
-    private final int buffer_size;
-    private boolean closing;
-
-    private int messageBatchSize;
-    
-    private AtomicLong pendings;
-    
-    Map storm_conf;
+    private final InetSocketAddress dstAddress;
+    protected final String dstAddressPrefixedName;
+
+    /**
+     * The channel used for all write operations from this client to the remote destination.
+     */
+    private final AtomicReference<Channel> channelRef = new AtomicReference<Channel>(null);
+
+
+    /**
+     * Maximum number of reconnection attempts we will perform after a disconnect before giving up.
+     */
+    private final int maxReconnectionAttempts;
+
+    /**
+     * Total number of connection attempts.
+     */
+    private final AtomicInteger totalConnectionAttempts = new AtomicInteger(0);
+
+    /**
+     * Number of connection attempts since the last disconnect.
+     */
+    private final AtomicInteger connectionAttempts = new AtomicInteger(0);
+
+    /**
+     * Number of messages successfully sent to the remote destination.
+     */
+    private final AtomicInteger messagesSent = new AtomicInteger(0);
+
+    /**
+     * Number of messages that could not be sent to the remote destination.
+     */
+    private final AtomicInteger messagesLost = new AtomicInteger(0);
+
+    /**
+     * Number of messages buffered in memory.
+     */
+    private final AtomicLong pendingMessages = new AtomicLong(0);
+
+    /**
+     * This flag is set to true if and only if a client instance is being closed.
+     */
+    private volatile boolean closing = false;
+
+    /**
+     * When set to true, then the background flusher thread will flush any pending messages on its next run.
+     */
+    private final AtomicBoolean backgroundFlushingEnabled = new AtomicBoolean(false);
+
+    /**
+     * The absolute time (in ms) when the next background flush should be performed.
+     *
+     * Note: The flush operation will only be performed if backgroundFlushingEnabled is true, too.
+     */
+    private final AtomicLong nextBackgroundFlushTimeMs = new AtomicLong(DISTANT_FUTURE_TIME_MS);
+
+    /**
+     * The time interval (in ms) at which the background flusher thread will be run to check for any pending messages
+     * to be flushed.
+     */
+    private final int flushCheckIntervalMs;
+
+    /**
+     * How many messages should be batched together before sending them to the remote destination.
+     *
+     * Messages are batched to optimize network throughput at the expense of latency.
+     */
+    private final int messageBatchSize;
 
     private MessageBatch messageBatch = null;
-    private AtomicLong flushCheckTimer;
-    private int flushCheckInterval;
-    private ScheduledExecutorService scheduler;
+    private final ListeningScheduledExecutorService scheduler;
+    protected final Map stormConf;
 
     @SuppressWarnings("rawtypes")
-    Client(Map storm_conf, ChannelFactory factory, 
-            ScheduledExecutorService scheduler, String host, int port) {
-    	this.storm_conf = storm_conf;
-        this.factory = factory;
-        this.scheduler = scheduler;
-        channelRef = new AtomicReference<Channel>(null);
+    Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port) {
         closing = false;
-        pendings = new AtomicLong(0);
-        flushCheckTimer = new AtomicLong(Long.MAX_VALUE);
-        totalReconnects = new AtomicInteger(0);
-        messagesSent = new AtomicInteger(0);
-        messagesLostReconnect = new AtomicInteger(0);
-
-        // Configure
-        buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
-        max_retries = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
-        base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
-        max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
-        retryPolicy = new StormBoundedExponentialBackoffRetry(base_sleep_ms, max_sleep_ms, max_retries);
-
-        this.messageBatchSize = Utils.getInt(storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
-        
-        flushCheckInterval = Utils.getInt(storm_conf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10); // default 10 ms
-
-        LOG.info("New Netty Client, connect to " + host + ", " + port
-                + ", config: " + ", buffer_size: " + buffer_size);
-
-        bootstrap = new ClientBootstrap(factory);
+        this.stormConf = stormConf;
+        this.scheduler =  MoreExecutors.listeningDecorator(scheduler);
+        int bufferSize = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+        LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", host, port, bufferSize);
+        messageBatchSize = Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
+        flushCheckIntervalMs = Utils.getInt(stormConf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10);
+
+        maxReconnectionAttempts = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
+        int minWaitMs = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
+        int maxWaitMs = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
+        retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts);
+
+        // Initiate connection to remote destination
+        bootstrap = createClientBootstrap(factory, bufferSize);
+        dstAddress = new InetSocketAddress(host, port);
+        dstAddressPrefixedName = prefixedName(dstAddress);
+        connect(NO_DELAY_MS);
+
+        // Launch background flushing thread
+        pauseBackgroundFlushing();
+        long initialDelayMs = Math.min(MINIMUM_INITIAL_DELAY_MS, maxWaitMs * maxReconnectionAttempts);
+        scheduler.scheduleWithFixedDelay(createBackgroundFlusher(), initialDelayMs, flushCheckIntervalMs,
+            TimeUnit.MILLISECONDS);
+    }
+
+    private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize) {
+        ClientBootstrap bootstrap = new ClientBootstrap(factory);
         bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setOption("sendBufferSize", buffer_size);
+        bootstrap.setOption("sendBufferSize", bufferSize);
         bootstrap.setOption("keepAlive", true);
-
-        // Set up the pipeline factory.
         bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
+        return bootstrap;
+    }
 
-        // Start the connection attempt.
-        remote_addr = new InetSocketAddress(host, port);
-        
-        // setup the connection asyncly now
-        scheduler.execute(new Runnable() {
-            @Override
-            public void run() {   
-                connect();
-            }
-        });
-        
-        Runnable flusher = new Runnable() {
+    private String prefixedName(InetSocketAddress dstAddress) {
+        if (null != dstAddress) {
+            return PREFIX + dstAddress.toString();
+        }
+        return "";
+    }
+
+    private Runnable createBackgroundFlusher() {
+        return new Runnable() {
             @Override
             public void run() {
-
-                if(!closing) {
-                    long flushCheckTime = flushCheckTimer.get();
-                    long now = System.currentTimeMillis();
-                    if (now > flushCheckTime) {
-                        Channel channel = channelRef.get();
-                        if (null != channel && channel.isWritable()) {
-                            flush(channel);
-                        }
-                    }
+                if(!closing && backgroundFlushingEnabled.get() && nowMillis() > nextBackgroundFlushTimeMs.get()) {
+                    LOG.debug("flushing {} pending messages to {} in background", messageBatch.size(),
+                        dstAddressPrefixedName);
+                    flushPendingMessages();
                 }
-                
             }
         };
-        
-        long initialDelay = Math.min(30L * 1000, max_sleep_ms * max_retries); //max wait for 30s
-        scheduler.scheduleWithFixedDelay(flusher, initialDelay, flushCheckInterval, TimeUnit.MILLISECONDS);
+    }
+
+    private void pauseBackgroundFlushing() {
+        backgroundFlushingEnabled.set(false);
+    }
+
+    private void resumeBackgroundFlushing() {
+        backgroundFlushingEnabled.set(true);
+    }
+
+    private synchronized void flushPendingMessages() {
+        Channel channel = channelRef.get();
+        if (containsMessages(messageBatch)) {
+            if (connectionEstablished(channel)) {
+                if (channel.isWritable()) {
+                    pauseBackgroundFlushing();
+                    MessageBatch toBeFlushed = messageBatch;
+                    flushMessages(channel, toBeFlushed);
+                    messageBatch = null;
+                }
+                else if (closing) {
+                    // Ensure background flushing is enabled so that we definitely have a chance to re-try the flush
+                    // operation in case the client is being gracefully closed (where we have a brief time window where
+                    // the client will wait for pending messages to be sent).
+                    resumeBackgroundFlushing();
+                }
+            }
+            else {
+                closeChannelAndReconnect(channel);
+            }
+        }
+    }
+
+    private long nowMillis() {
+        return System.currentTimeMillis();
     }
 
     /**
      * We will retry connection with exponential back-off policy
      */
-    private synchronized void connect() {
+    private synchronized void connect(long delayMs) {
         try {
+            if (closing) {
+                return;
+            }
 
-            Channel channel = channelRef.get();
-            if (channel != null && channel.isConnected()) {
+            if (connectionEstablished(channelRef.get())) {
                 return;
             }
 
-            int tried = 0;
-            //setting channel to null to make sure we throw an exception when reconnection fails
-            channel = null;
-            while (tried <= max_retries) {
-
-                LOG.info("Reconnect started for {}... [{}]", name(), tried);
-                LOG.debug("connection started...");
-
-                totalReconnects.getAndIncrement();
-                ChannelFuture future = bootstrap.connect(remote_addr);
-                future.awaitUninterruptibly();
-                Channel current = future.getChannel();
-                if (!future.isSuccess()) {
-                    if (null != current) {
-                        current.close();
+            connectionAttempts.getAndIncrement();
+            if (reconnectingAllowed()) {
+                totalConnectionAttempts.getAndIncrement();
+                LOG.info("connection attempt {} to {} scheduled to run in {} ms", connectionAttempts.get(),
+                    dstAddressPrefixedName, delayMs);
+                ListenableFuture<Channel> channelFuture = scheduler.schedule(
+                    new Connector(dstAddress, connectionAttempts.get()), delayMs, TimeUnit.MILLISECONDS);
+                Futures.addCallback(channelFuture, new FutureCallback<Channel>() {
+                    @Override public void onSuccess(Channel result) {
+                        if (connectionEstablished(result)) {
+                            setChannel(result);
+                            LOG.info("connection established to {}", dstAddressPrefixedName);
+                            connectionAttempts.set(0);
+                        }
+                        else {
+                            reconnectAgain(new RuntimeException("Returned channel was actually not established"));
+                        }
+                    }
+
+                    @Override public void onFailure(Throwable t) {
+                        reconnectAgain(t);
                     }
-                } else {
-                    channel = current;
-                    break;
-                }
-                Thread.sleep(retryPolicy.getSleepTimeMs(tried, 0));
-                tried++;  
+
+                    private void reconnectAgain(Throwable t) {
+                        String baseMsg = String.format("connection attempt %s to %s failed", connectionAttempts,
+                            dstAddressPrefixedName);
+                        String failureMsg = (t == null)? baseMsg : baseMsg + ": " + t.toString();
+                        LOG.error(failureMsg);
+                        long nextDelayMs = retryPolicy.getSleepTimeMs(connectionAttempts.get(), 0);
+                        connect(nextDelayMs);
+                    }
+                });
             }
-            if (null != channel) {
-                LOG.info("connection established to a remote host " + name() + ", " + channel.toString());
-                channelRef.set(channel);
-            } else {
+            else {
                 close();
-                throw new RuntimeException("Remote address is not reachable. We will close this client " + name());
+                throw new RuntimeException("Giving up to connect to " + dstAddressPrefixedName + " after " +
+                    connectionAttempts + " failed attempts");
             }
-        } catch (InterruptedException e) {
-            throw new RuntimeException("connection failed " + name(), e);
         }
+        catch (Exception e) {
+            throw new RuntimeException("Failed to connect to " + dstAddressPrefixedName, e);
+        }
+    }
+
+    private void setChannel(Channel channel) {
+        channelRef.set(channel);
+    }
+
+    private boolean reconnectingAllowed() {
+        return !closing && connectionAttempts.get() <= (maxReconnectionAttempts + 1);
+    }
+
+    private boolean connectionEstablished(Channel channel) {
+        // Because we are using TCP (which is a connection-oriented transport unlike UDP), a connection is only fully
+        // established iff the channel is connected.  That is, a TCP-based channel must be in the CONNECTED state before
+        // anything can be read or written to the channel.
+        //
+        // See:
+        // - http://netty.io/3.9/api/org/jboss/netty/channel/ChannelEvent.html
+        // - http://stackoverflow.com/questions/13356622/what-are-the-netty-channel-state-transitions
+        return channel != null && channel.isConnected();
     }
 
     /**
-     * Enqueue task messages to be sent to server
+     * Note:  Storm will check via this method whether a worker can be activated safely during the initial startup of a
+     * topology.  The worker will only be activated once all of the its connections are ready.
      */
-    synchronized public void send(Iterator<TaskMessage> msgs) {
+    @Override
+    public Status status() {
+        if (closing) {
+            return Status.Closed;
+        }
+        else if (!connectionEstablished(channelRef.get())) {
+            return Status.Connecting;
+        }
+        else {
+            return Status.Ready;
+        }
+    }
+
+    /**
+     * Receiving messages is not supported by a client.
+     *
+     * @throws java.lang.UnsupportedOperationException whenever this method is being called.
+     */
+    @Override
+    public Iterator<TaskMessage> recv(int flags, int clientId) {
+        throw new UnsupportedOperationException("Client connection should not receive any messages");
+    }
 
-        // throw exception if the client is being closed
+    @Override
+    public void send(int taskId, byte[] payload) {
+        TaskMessage msg = new TaskMessage(taskId, payload);
+        List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
+        wrapper.add(msg);
+        send(wrapper.iterator());
+    }
+
+    /**
+     * Enqueue task messages to be sent to the remote destination (cf. `host` and `port`).
+     */
+    @Override
+    public synchronized void send(Iterator<TaskMessage> msgs) {
         if (closing) {
-            throw new RuntimeException("Client is being closed, and does not take requests any more");
+            int numMessages = iteratorSize(msgs);
+            LOG.warn("discarding {} messages because the Netty client to {} is being closed", numMessages,
+                dstAddressPrefixedName);
+            return;
         }
-        
-        if (null == msgs || !msgs.hasNext()) {
+
+        if (!hasMessages(msgs)) {
             return;
         }
 
         Channel channel = channelRef.get();
-        if (null == channel) {
-            connect();
-            channel = channelRef.get();
+        if (!connectionEstablished(channel)) {
+            // Closing the channel and reconnecting should be done before handling the messages.
+            closeChannelAndReconnect(channel);
+            handleMessagesWhenConnectionIsUnavailable(msgs);
+            return;
         }
 
+        // Collect messages into batches (to optimize network throughput), then flush them.
         while (msgs.hasNext()) {
-            if (!channel.isConnected()) {
-                connect();
-                channel = channelRef.get();
-            }
             TaskMessage message = msgs.next();
-            if (null == messageBatch) {
+            if (messageBatch == null) {
                 messageBatch = new MessageBatch(messageBatchSize);
             }
 
             messageBatch.add(message);
             if (messageBatch.isFull()) {
                 MessageBatch toBeFlushed = messageBatch;
-                flushRequest(channel, toBeFlushed);
+                flushMessages(channel, toBeFlushed);
                 messageBatch = null;
             }
         }
 
-        if (null != messageBatch && !messageBatch.isEmpty()) {
-            if (channel.isWritable()) {
-                flushCheckTimer.set(Long.MAX_VALUE);
-                
-                // Flush as fast as we can to reduce the latency
+        // Handle any remaining messages in case the "last" batch was not full.
+        if (containsMessages(messageBatch)) {
+            if (connectionEstablished(channel) && channel.isWritable()) {
+                // We can write to the channel, so we flush the remaining messages immediately to minimize latency.
+                pauseBackgroundFlushing();
                 MessageBatch toBeFlushed = messageBatch;
                 messageBatch = null;
-                flushRequest(channel, toBeFlushed);
-                
-            } else {
-                // when channel is NOT writable, it means the internal netty buffer is full. 
-                // In this case, we can try to buffer up more incoming messages.
-                flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval);
+                flushMessages(channel, toBeFlushed);
+            }
+            else {
+                // We cannot write to the channel, which means Netty's internal write buffer is full.
+                // In this case, we buffer the remaining messages and wait for the next messages to arrive.
+                //
+                // Background:
+                // Netty 3.x maintains an internal write buffer with a high water mark for each channel (default: 64K).
+                // This represents the amount of data waiting to be flushed to operating system buffers.  If the
+                // outstanding data exceeds this value then the channel is set to non-writable.  When this happens, a
+                // INTEREST_CHANGED channel event is triggered.  Netty sets the channel to writable again once the data
+                // has been flushed to the system buffers.
+                //
+                // See http://stackoverflow.com/questions/14049260
+                resumeBackgroundFlushing();
+                nextBackgroundFlushTimeMs.set(nowMillis() + flushCheckIntervalMs);
             }
         }
 
     }
 
-    public String name() {
-        if (null != remote_addr) {
-            return PREFIX + remote_addr.toString();
-        }
-        return "";
+    private boolean hasMessages(Iterator<TaskMessage> msgs) {
+        return msgs != null && msgs.hasNext();
     }
 
-    private synchronized void flush(Channel channel) {
-        if (!closing) {
-            if (null != messageBatch && !messageBatch.isEmpty()) {
-                MessageBatch toBeFlushed = messageBatch;
-                flushCheckTimer.set(Long.MAX_VALUE);
-                flushRequest(channel, toBeFlushed);
-                messageBatch = null;
+    /**
+     * We will drop pending messages and let at-least-once message replay kick in.
+     *
+     * Another option would be to buffer the messages in memory.  But this option has the risk of causing OOM errors,
+     * especially for topologies that disable message acking because we don't know whether the connection recovery will
+     * succeed  or not, and how long the recovery will take.
+     */
+    private void handleMessagesWhenConnectionIsUnavailable(Iterator<TaskMessage> msgs) {
+        LOG.error("connection to {} is unavailable", dstAddressPrefixedName);
+        dropPendingMessages(msgs);
+    }
+
+    private void dropPendingMessages(Iterator<TaskMessage> msgs) {
+        // We consume the iterator by traversing and thus "emptying" it.
+        int msgCount = iteratorSize(msgs);
+        LOG.error("dropping {} pending message(s) destined for {}", msgCount, dstAddressPrefixedName);
+    }
+
+    private int iteratorSize(Iterator<TaskMessage> msgs) {
+        int size = 0;
+        if (msgs != null) {
+            while (msgs.hasNext()) {
+                size++;
+                msgs.next();
             }
         }
+        return size;
     }
-    
+
     /**
-     * gracefully close this client.
-     * 
-     * We will send all existing requests, and then invoke close_n_release()
-     * method
+     * Asynchronously writes the message batch to the channel.
+     *
+     * If the write operation fails, then we will close the channel and trigger a reconnect.
      */
-    public synchronized void close() {
-        if (!closing) {
-            closing = true;
-            LOG.info("Closing Netty Client " + name());
-            
-            if (null != messageBatch && !messageBatch.isEmpty()) {
-                MessageBatch toBeFlushed = messageBatch;
-                Channel channel = channelRef.get();
-                if (channel != null) {
-                    flushRequest(channel, toBeFlushed);
+    private synchronized void flushMessages(Channel channel, final MessageBatch batch) {
+        if (!containsMessages(batch)) {
+            return;
+        }
+
+        final int numMessages = batch.size();
+        pendingMessages.getAndAdd(numMessages);
+        LOG.debug("writing {} messages to channel {}", batch.size(), channel.toString());
+        ChannelFuture future = channel.write(batch);
+        future.addListener(new ChannelFutureListener() {
+
+            public void operationComplete(ChannelFuture future) throws Exception {
+                pendingMessages.getAndAdd(0 - numMessages);
+                if (future.isSuccess()) {
+                    LOG.debug("sent {} messages to {}", numMessages, dstAddressPrefixedName);
+                    messagesSent.getAndAdd(batch.size());
+                }
+                else {
+                    LOG.warn("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName,
+                        future.getCause());
+                    closeChannelAndReconnect(future.getChannel());
+                    messagesLost.getAndAdd(numMessages);
                 }
-                messageBatch = null;
-            }
-        
-            //wait for pendings to exit
-            final long timeoutMilliSeconds = 600 * 1000; //600 seconds
-            final long start = System.currentTimeMillis();
-            
-            LOG.info("Waiting for pending batchs to be sent with "+ name() + "..., timeout: {}ms, pendings: {}", timeoutMilliSeconds, pendings.get());
-            
-            while(pendings.get() != 0) {
-                try {
-                    long delta = System.currentTimeMillis() - start;
-                    if (delta > timeoutMilliSeconds) {
-                        LOG.error("Timeout when sending pending batchs with {}..., there are still {} pending batchs not sent", name(), pendings.get());
-                        break;
-                    }
-                    Thread.sleep(1000); //sleep 1s
-                } catch (InterruptedException e) {
-                    break;
-                } 
             }
-            
-            close_n_release();
-        }
+
+        });
     }
 
-    /**
-     * close_n_release() is invoked after all messages have been sent.
-     */
-    private void close_n_release() {
-        if (channelRef.get() != null) {
-            channelRef.get().close();
-            LOG.debug("channel {} closed",remote_addr);
+    private synchronized void closeChannelAndReconnect(Channel channel) {
+        if (channel != null) {
+            channel.close();
+            if (channelRef.compareAndSet(channel, null)) {
+                connect(NO_DELAY_MS);
+            }
         }
     }
 
-    @Override
-    public Iterator<TaskMessage> recv(int flags, int clientId) {
-        throw new RuntimeException("Client connection should not receive any messages");
+    private boolean containsMessages(MessageBatch batch) {
+        return batch != null && !batch.isEmpty();
     }
 
+    /**
+     * Gracefully close this client.
+     *
+     * We will attempt to send any pending messages (i.e. messages currently buffered in memory) before closing the
+     * client.
+     */
     @Override
-    public void send(int taskId, byte[] payload) {
-        TaskMessage msg = new TaskMessage(taskId, payload);
-        List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
-        wrapper.add(msg);
-        send(wrapper.iterator());
+    public void close() {
+        if (!closing) {
+            LOG.info("closing Netty Client {}", dstAddressPrefixedName);
+            // Set closing to true to prevent any further reconnection attempts.
+            closing = true;
+            flushPendingMessages();
+            waitForPendingMessagesToBeSent();
+            closeChannel();
+        }
     }
 
-    private void flushRequest(Channel channel, final MessageBatch requests) {
-        if (requests == null)
-            return;
-
-        pendings.getAndAdd(requests.size());
-        ChannelFuture future = channel.write(requests);
-        future.addListener(new ChannelFutureListener() {
-            public void operationComplete(ChannelFuture future)
-                    throws Exception {
-
-                pendings.getAndAdd(0-requests.size());
-                if (!future.isSuccess()) {
-                    LOG.info(
-                            "failed to send requests to " + remote_addr.toString() + ": ", future.getCause());
-
-                    Channel channel = future.getChannel();
-
-                    if (null != channel) {
-                        channel.close();
-                        channelRef.compareAndSet(channel, null);
-                    }
-                    messagesLostReconnect.getAndAdd(requests.size());
-                } else {
-                    messagesSent.getAndAdd(requests.size());
-                    LOG.debug("{} request(s) sent", requests.size());
+    private synchronized void waitForPendingMessagesToBeSent() {
+        LOG.info("waiting up to {} ms to send {} pending messages to {}",
+            PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), dstAddressPrefixedName);
+        long totalPendingMsgs = pendingMessages.get();
+        long startMs = nowMillis();
+        while (pendingMessages.get() != 0) {
+            try {
+                long deltaMs = nowMillis() - startMs;
+                if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
+                    LOG.error("failed to send all pending messages to {} within timeout, {} of {} messages were not " +
+                        "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs);
+                    break;
                 }
+                Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
             }
-        });
+            catch (InterruptedException e) {
+                break;
+            }
+        }
+
+    }
+
+    private synchronized void closeChannel() {
+        if (channelRef.get() != null) {
+            channelRef.get().close();
+            LOG.debug("channel to {} closed", dstAddressPrefixedName);
+        }
     }
 
     @Override
     public Object getState() {
-        LOG.info("Getting metrics for connection to "+remote_addr);
+        LOG.info("Getting metrics for client connection to {}", dstAddressPrefixedName);
         HashMap<String, Object> ret = new HashMap<String, Object>();
-        ret.put("reconnects", totalReconnects.getAndSet(0));
+        ret.put("reconnects", totalConnectionAttempts.getAndSet(0));
         ret.put("sent", messagesSent.getAndSet(0));
-        ret.put("pending", pendings.get());
-        ret.put("lostOnSend", messagesLostReconnect.getAndSet(0));
-        ret.put("dest", remote_addr.toString());
+        ret.put("pending", pendingMessages.get());
+        ret.put("lostOnSend", messagesLost.getAndSet(0));
+        ret.put("dest", dstAddress.toString());
+        String src = srcAddressName();
+        if (src != null) {
+            ret.put("src", src);
+        }
+        return ret;
+    }
+
+    private String srcAddressName() {
+        String name = null;
         Channel c = channelRef.get();
         if (c != null) {
             SocketAddress address = c.getLocalAddress();
             if (address != null) {
-              ret.put("src", address.toString());
+                name = address.toString();
             }
         }
-        return ret;
+        return name;
+    }
+
+    @Override public String toString() {
+        return String.format("Netty client for connecting to %s", dstAddressPrefixedName);
     }
-}
 
+    /**
+     * Asynchronously establishes a Netty connection to the remote address, returning a Netty Channel on success.
+     */
+    private class Connector implements Callable<Channel> {
+
+        private final InetSocketAddress address;
+        private final int connectionAttempt;
+
+        public Connector(InetSocketAddress address, int connectionAttempt) {
+            this.address = address;
+            if (connectionAttempt < 1) {
+                throw new IllegalArgumentException("connection attempt must be >= 1 (you provided " +
+                    connectionAttempt + ")");
+            }
+            this.connectionAttempt = connectionAttempt;
+        }
+
+        @Override public Channel call() throws Exception {
+            LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt);
+            Channel channel = null;
+            ChannelFuture future = bootstrap.connect(address);
+            future.awaitUninterruptibly();
+            Channel current = future.getChannel();
+
+            if (future.isSuccess() && connectionEstablished(current)) {
+                channel = current;
+                LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(), channel.toString(),
+                    connectionAttempt);
+            }
+            else {
+                LOG.debug("failed to connect to {} [attempt {}]", address.toString(), connectionAttempt);
+                if (current != null) {
+                    current.close();
+                }
+            }
+            return channel;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
index f94cbc3..32ecb40 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
@@ -146,9 +146,8 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
     }
 
     private void getSASLCredentials() throws IOException {
-        topologyName = (String) this.client.storm_conf
-                .get(Config.TOPOLOGY_NAME);
-        String secretKey = SaslUtils.getSecretKey(this.client.storm_conf);
+        topologyName = (String) this.client.stormConf.get(Config.TOPOLOGY_NAME);
+        String secretKey = SaslUtils.getSecretKey(this.client.stormConf);
         if (secretKey != null) {
             token = secretKey.getBytes();
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index d1f10e1..e984144 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -40,12 +40,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import backtype.storm.Config;
+import backtype.storm.messaging.ConnectionWithStatus;
 import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.TaskMessage;
 import backtype.storm.metric.api.IStatefulObject;
 import backtype.storm.utils.Utils;
 
-class Server implements IConnection, IStatefulObject {
+class Server extends ConnectionWithStatus implements IStatefulObject {
+
     private static final Logger LOG = LoggerFactory.getLogger(Server.class);
     @SuppressWarnings("rawtypes")
     Map storm_conf;
@@ -67,7 +69,7 @@ class Server implements IConnection, IStatefulObject {
     private volatile HashMap<Integer, Integer> taskToQueueId = null;
     int roundRobinQueueId;
 	
-    boolean closing = false;
+    private volatile boolean closing = false;
     List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
     
     
@@ -120,45 +122,45 @@ class Server implements IConnection, IStatefulObject {
     }
     
     private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs) {
-      ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount];
-      
-      for (int i = 0; i < msgs.size(); i++) {
-        TaskMessage message = msgs.get(i);
-        int task = message.task();
-        
-        if (task == -1) {
-          closing = true;
-          return null;
-        }
-        
-        Integer queueId = getMessageQueueId(task);
-        
-        if (null == messageGroups[queueId]) {
-          messageGroups[queueId] = new ArrayList<TaskMessage>();
+        ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount];
+
+        for (int i = 0; i < msgs.size(); i++) {
+            TaskMessage message = msgs.get(i);
+            int task = message.task();
+
+            if (task == -1) {
+                closing = true;
+                return null;
+            }
+
+            Integer queueId = getMessageQueueId(task);
+
+            if (null == messageGroups[queueId]) {
+                messageGroups[queueId] = new ArrayList<TaskMessage>();
+            }
+            messageGroups[queueId].add(message);
         }
-        messageGroups[queueId].add(message);
-      }
-      return messageGroups;
+        return messageGroups;
     }
     
     private Integer getMessageQueueId(int task) {
-      // try to construct the map from taskId -> queueId in round robin manner.
-      Integer queueId = taskToQueueId.get(task);
-      if (null == queueId) {
-        synchronized (this) {
-          queueId = taskToQueueId.get(task);
-          if (queueId == null) {
-            queueId = roundRobinQueueId++;
-            if (roundRobinQueueId == queueCount) {
-              roundRobinQueueId = 0;
+        // try to construct the map from taskId -> queueId in round robin manner.
+        Integer queueId = taskToQueueId.get(task);
+        if (null == queueId) {
+            synchronized (this) {
+                queueId = taskToQueueId.get(task);
+                if (queueId == null) {
+                    queueId = roundRobinQueueId++;
+                    if (roundRobinQueueId == queueCount) {
+                        roundRobinQueueId = 0;
+                    }
+                    HashMap<Integer, Integer> newRef = new HashMap<Integer, Integer>(taskToQueueId);
+                    newRef.put(task, queueId);
+                    taskToQueueId = newRef;
+                }
             }
-            HashMap<Integer, Integer> newRef = new HashMap<Integer, Integer>(taskToQueueId);
-            newRef.put(task, queueId);
-            taskToQueueId = newRef;
-          }
         }
-      }
-      return queueId;
+        return queueId;
     }
 
     private void addReceiveCount(String from, int amount) {
@@ -182,57 +184,57 @@ class Server implements IConnection, IStatefulObject {
 
     /**
      * enqueue a received message 
-     * @param message
      * @throws InterruptedException
      */
     protected void enqueue(List<TaskMessage> msgs, String from) throws InterruptedException {
-      
-      if (null == msgs || msgs.size() == 0 || closing) {
-        return;
-      }
-      addReceiveCount(from, msgs.size());
-      ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs);
-      
-      if (null == messageGroups || closing) {
-        return;
-      }
-      
-      for (int receiverId = 0; receiverId < messageGroups.length; receiverId++) {
-        ArrayList<TaskMessage> msgGroup = messageGroups[receiverId];
-        if (null != msgGroup) {
-          message_queue[receiverId].put(msgGroup);
-          pendingMessages[receiverId].addAndGet(msgGroup.size());
+        if (null == msgs || msgs.size() == 0 || closing) {
+            return;
+        }
+        addReceiveCount(from, msgs.size());
+        ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs);
+
+        if (null == messageGroups || closing) {
+            return;
+        }
+
+        for (int receiverId = 0; receiverId < messageGroups.length; receiverId++) {
+            ArrayList<TaskMessage> msgGroup = messageGroups[receiverId];
+            if (null != msgGroup) {
+                message_queue[receiverId].put(msgGroup);
+                pendingMessages[receiverId].addAndGet(msgGroup.size());
+            }
         }
-      }
     }
-    
-    public Iterator<TaskMessage> recv(int flags, int receiverId)  {
-      if (closing) {
-        return closeMessage.iterator();
-      }
-      
-      ArrayList<TaskMessage> ret = null; 
-      int queueId = receiverId % queueCount;
-      if ((flags & 0x01) == 0x01) { 
+
+    public Iterator<TaskMessage> recv(int flags, int receiverId) {
+        if (closing) {
+            return closeMessage.iterator();
+        }
+
+        ArrayList<TaskMessage> ret = null;
+        int queueId = receiverId % queueCount;
+        if ((flags & 0x01) == 0x01) {
             //non-blocking
             ret = message_queue[queueId].poll();
-        } else {
+        }
+        else {
             try {
                 ArrayList<TaskMessage> request = message_queue[queueId].take();
                 LOG.debug("request to be processed: {}", request);
                 ret = request;
-            } catch (InterruptedException e) {
+            }
+            catch (InterruptedException e) {
                 LOG.info("exception within msg receiving", e);
                 ret = null;
             }
         }
-      
-      if (null != ret) {
-        messagesDequeued.addAndGet(ret.size());
-        pendingMessages[queueId].addAndGet(0 - ret.size());
-        return ret.iterator();
-      }
-      return null;
+
+        if (null != ret) {
+            messagesDequeued.addAndGet(ret.size());
+            pendingMessages[queueId].addAndGet(0 - ret.size());
+            return ret.iterator();
+        }
+        return null;
     }
    
     /**
@@ -264,11 +266,11 @@ class Server implements IConnection, IStatefulObject {
     }
 
     public void send(int task, byte[] message) {
-        throw new RuntimeException("Server connection should not send any messages");
+        throw new UnsupportedOperationException("Server connection should not send any messages");
     }
     
     public void send(Iterator<TaskMessage> msgs) {
-      throw new RuntimeException("Server connection should not send any messages");
+      throw new UnsupportedOperationException("Server connection should not send any messages");
     }
 	
     public String name() {
@@ -276,8 +278,35 @@ class Server implements IConnection, IStatefulObject {
     }
 
     @Override
+    public Status status() {
+        if (closing) {
+          return Status.Closed;
+        }
+        else if (!connectionEstablished(allChannels)) {
+            return Status.Connecting;
+        }
+        else {
+            return Status.Ready;
+        }
+    }
+
+    private boolean connectionEstablished(Channel channel) {
+      return channel != null && channel.isBound();
+    }
+
+    private boolean connectionEstablished(ChannelGroup allChannels) {
+        boolean allEstablished = true;
+        for (Channel channel : allChannels) {
+            if (!(connectionEstablished(channel))) {
+                allEstablished = false;
+                break;
+            }
+        }
+        return allEstablished;
+    }
+
     public Object getState() {
-        LOG.info("Getting metrics for server on " + port);
+        LOG.info("Getting metrics for server on port {}", port);
         HashMap<String, Object> ret = new HashMap<String, Object>();
         ret.put("dequeuedMessages", messagesDequeued.getAndSet(0));
         ArrayList<Integer> pending = new ArrayList<Integer>(pendingMessages.length);
@@ -300,4 +329,9 @@ class Server implements IConnection, IStatefulObject {
         ret.put("enqueued", enqueued);
         return ret;
     }
+
+    @Override public String toString() {
+       return String.format("Netty server listening on port %s", port);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
index 1ea382b..2adfceb 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@ -39,15 +39,14 @@ class StormClientPipelineFactory implements ChannelPipelineFactory {
         // Encoder
         pipeline.addLast("encoder", new MessageEncoder());
 
-        boolean isNettyAuth = (Boolean) this.client.storm_conf
-                .get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
+        boolean isNettyAuth = (Boolean) this.client.stormConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
         if (isNettyAuth) {
             // Authenticate: Removed after authentication completes
             pipeline.addLast("saslClientHandler", new SaslStormClientHandler(
                     client));
         }
         // business logic.
-        pipeline.addLast("handler", new StormClientErrorHandler(client.name()));
+        pipeline.addLast("handler", new StormClientErrorHandler(client.dstAddressPrefixedName));
 
         return pipeline;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index 2061ddf..b152af2 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -16,14 +16,36 @@
 (ns backtype.storm.messaging.netty-unit-test
   (:use [clojure test])
   (:import [backtype.storm.messaging TransportFactory])
-  (:use [backtype.storm bootstrap testing util]))
+  (:use [backtype.storm bootstrap testing util])
+  (:use [backtype.storm.daemon.worker :only [is-connection-ready]]))
 
 (bootstrap)
 
 (def port 6700)
 (def task 1)
 
+;; In a "real" cluster (or an integration test), Storm itself would ensure that a topology's workers would only be
+;; activated once all the workers' connections are ready.  The tests in this file however launch Netty servers and
+;; clients directly, and thus we must ensure manually that the server and the client connections are ready before we
+;; commence testing.  If we don't do this, then we will lose the first messages being sent between the client and the
+;; server, which will fail the tests.
+(defn- wait-until-ready
+  ([connections]
+      (do (log-message "Waiting until all Netty connections are ready...")
+          (wait-until-ready connections 0)))
+  ([connections waited-ms]
+    (let [interval-ms 10
+          max-wait-ms 5000]
+      (if-not (every? is-connection-ready connections)
+        (if (<= waited-ms max-wait-ms)
+          (do
+            (Thread/sleep interval-ms)
+            (wait-until-ready connections (+ waited-ms interval-ms)))
+          (throw (RuntimeException. (str "Netty connections were not ready within " max-wait-ms " ms"))))
+        (log-message "All Netty connections are ready")))))
+
 (deftest test-basic
+  (log-message "Should send and receive a basic message")
   (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
         storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
                     STORM-MESSAGING-NETTY-AUTHENTICATION false
@@ -37,6 +59,7 @@
         context (TransportFactory/makeContext storm-conf)
         server (.bind context nil port)
         client (.connect context nil "localhost" port)
+        _ (wait-until-ready [server client])
         _ (.send client task (.getBytes req_msg))
         iter (.recv server 0 0)
         resp (.next iter)]
@@ -47,6 +70,7 @@
     (.term context)))
 
 (deftest test-large-msg
+  (log-message "Should send and receive a large message")
   (let [req_msg (apply str (repeat 2048000 'c'))
         storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
                     STORM-MESSAGING-NETTY-AUTHENTICATION false
@@ -60,6 +84,7 @@
         context (TransportFactory/makeContext storm-conf)
         server (.bind context nil port)
         client (.connect context nil "localhost" port)
+        _ (wait-until-ready [server client])
         _ (.send client task (.getBytes req_msg))
         iter (.recv server 0 0)
         resp (.next iter)]
@@ -69,39 +94,9 @@
     (.close server)
     (.term context)))
 
-(deftest test-server-delayed
-    (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
-       storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
-                    STORM-MESSAGING-NETTY-AUTHENTICATION false
-                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
-                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
-                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
-                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
-                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
-                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
-                    }
-        context (TransportFactory/makeContext storm-conf)
-        client (.connect context nil "localhost" port)
-
-        server (Thread.
-                (fn []
-                  (Thread/sleep 1000)
-                  (let [server (.bind context nil port)
-                        iter (.recv server 0 0)
-                        resp (.next iter)]
-                    (is (= task (.task resp)))
-                    (is (= req_msg (String. (.message resp))))
-                    (.close server)
-                  )))
-        _ (.start server)
-        _ (.send client task (.getBytes req_msg))
-        ]
-    (.close client)
-    (.join server)
-    (.term context)))
-
 (deftest test-batch
-  (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+  (let [num-messages 100000
+        storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
                     STORM-MESSAGING-NETTY-AUTHENTICATION false
                     STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
                     STORM-MESSAGING-NETTY-MAX-RETRIES 10
@@ -110,23 +105,25 @@
                     STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
                     STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
                     }
+        _ (log-message "Should send and receive many messages (testing with " num-messages " messages)")
         context (TransportFactory/makeContext storm-conf)
         server (.bind context nil port)
-        client (.connect context nil "localhost" port)]
-    (doseq [num  (range 1 100000)]
+        client (.connect context nil "localhost" port)
+        _ (wait-until-ready [server client])]
+    (doseq [num  (range 1 num-messages)]
       (let [req_msg (str num)]
         (.send client task (.getBytes req_msg))))
 
     (let [resp (ArrayList.)
           received (atom 0)]
-      (while (< @received (- 100000 1))
+      (while (< @received (- num-messages 1))
         (let [iter (.recv server 0 0)]
           (while (.hasNext iter)
             (let [msg (.next iter)]
               (.add resp msg)
               (swap! received inc)
               ))))
-      (doseq [num  (range 1 100000)]
+      (doseq [num  (range 1 num-messages)]
       (let [req_msg (str num)
             resp_msg (String. (.message (.get resp (- num 1))))]
         (is (= req_msg resp_msg)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/205eaf4e/storm-core/test/clj/backtype/storm/worker_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/worker_test.clj b/storm-core/test/clj/backtype/storm/worker_test.clj
new file mode 100644
index 0000000..f09baef
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/worker_test.clj
@@ -0,0 +1,38 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.worker-test
+  (:use [clojure test])
+  (:import [backtype.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status])
+  (:import [org.mockito Mockito])
+  (:use [backtype.storm bootstrap testing])
+  (:use [backtype.storm.daemon common])
+
+  (:require [backtype.storm.daemon [worker :as worker]])
+  )
+
+(bootstrap)
+
+(deftest test-worker-is-connection-ready
+  (let [connection (Mockito/mock ConnectionWithStatus)]
+    (. (Mockito/when (.status connection)) thenReturn ConnectionWithStatus$Status/Ready)
+    (is (= true (worker/is-connection-ready connection)))
+
+    (. (Mockito/when (.status connection)) thenReturn ConnectionWithStatus$Status/Connecting)
+    (is (= false (worker/is-connection-ready connection)))
+
+    (. (Mockito/when (.status connection)) thenReturn ConnectionWithStatus$Status/Closed)
+    (is (= false (worker/is-connection-ready connection)))
+  ))
\ No newline at end of file


[5/6] storm git commit: merge fix for STORM-329

Posted by pt...@apache.org.
merge fix for STORM-329


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c2916315
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c2916315
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c2916315

Branch: refs/heads/master
Commit: c291631563e2ab0eaa7f197dc95b340552bd12f7
Parents: e6a29e0 522d96e
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Feb 23 14:41:42 2015 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Mon Feb 23 14:41:42 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md                                    |   1 +
 conf/defaults.yaml                              |   2 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |  52 +-
 .../src/clj/backtype/storm/messaging/local.clj  |   2 +-
 .../storm/messaging/ConnectionWithStatus.java   |  32 +
 .../backtype/storm/messaging/netty/Client.java  | 712 ++++++++++++-------
 .../messaging/netty/SaslStormClientHandler.java |   5 +-
 .../backtype/storm/messaging/netty/Server.java  | 182 +++--
 .../netty/StormClientPipelineFactory.java       |   5 +-
 .../storm/messaging/netty_unit_test.clj         |  40 +-
 .../test/clj/backtype/storm/worker_test.clj     |  37 +
 11 files changed, 737 insertions(+), 333 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c2916315/CHANGELOG.md
----------------------------------------------------------------------
diff --cc CHANGELOG.md
index 3ed28c0,d0e4a03..7d465dd
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@@ -1,7 -1,4 +1,8 @@@
  ## 0.10.0
++ * STORM-329: fix cascading Storm failure by improving reconnection strategy and buffering messages
 + * STORM-641: Add total number of topologies to api/v1/cluster/summary.
 + * STORM-640: Storm UI vulnerable to poodle attack.
 + * STORM-651: improvements to storm.cmd
   * STORM-456: Storm UI: cannot navigate to topology page when name contains spaces.
   * STORM-627: Storm-hbase configuration error.
   * STORM-248: cluster.xml location is hardcoded for workers

http://git-wip-us.apache.org/repos/asf/storm/blob/c2916315/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/worker.clj
index 46ded42,15c6143..dad9354
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@@ -15,23 -15,13 +15,23 @@@
  ;; limitations under the License.
  (ns backtype.storm.daemon.worker
    (:use [backtype.storm.daemon common])
 -  (:use [backtype.storm bootstrap])
 +  (:use [backtype.storm config log util timer])
    (:require [backtype.storm.daemon [executor :as executor]])
 +  (:require [backtype.storm [disruptor :as disruptor] [cluster :as cluster]])
 +  (:require [clojure.set :as set])
 +  (:require [backtype.storm.messaging.loader :as msg-loader])
    (:import [java.util.concurrent Executors])
    (:import [java.util ArrayList HashMap])
 -  (:import [backtype.storm.utils TransferDrainer])
 +  (:import [backtype.storm.utils Utils TransferDrainer ThriftTopologyUtils])
    (:import [backtype.storm.messaging TransportFactory])
-   (:import [backtype.storm.messaging TaskMessage IContext IConnection])
+   (:import [backtype.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status])
 +  (:import [backtype.storm.daemon.common WorkerHeartbeat])
 +  (:import [backtype.storm.daemon Shutdownable])
 +  (:import [backtype.storm.serialization KryoTupleSerializer])
 +  (:import [backtype.storm.generated StormTopology])
 +  (:import [backtype.storm.tuple Fields])
 +  (:import [backtype.storm.task WorkerTopologyContext])
 +  (:import [backtype.storm Constants])
    (:import [backtype.storm.security.auth AuthUtils])
    (:import [javax.security.auth Subject])
    (:import [java.security PrivilegedExceptionAction])

http://git-wip-us.apache.org/repos/asf/storm/blob/c2916315/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index 38160f8,b152af2..51d03b1
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@@ -16,8 -16,10 +16,9 @@@
  (ns backtype.storm.messaging.netty-unit-test
    (:use [clojure test])
    (:import [backtype.storm.messaging TransportFactory])
-   (:use [backtype.storm testing util config])
 -  (:use [backtype.storm bootstrap testing util])
 -  (:use [backtype.storm.daemon.worker :only [is-connection-ready]]))
 -
 -(bootstrap)
++  (:use [backtype.storm testing util config log])
++  (:use [backtype.storm.daemon.worker :only [is-connection-ready]])
 +  (:import [java.util ArrayList]))
  
  (def port 6700)
  (def task 1)
@@@ -68,39 -94,9 +93,40 @@@
      (.close server)
      (.term context)))
  
 +(deftest test-server-delayed
 +    (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
 +       storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
 +                    STORM-MESSAGING-NETTY-AUTHENTICATION false
 +                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
 +                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
 +                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
 +                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
 +                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
 +                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
 +                    }
 +        context (TransportFactory/makeContext storm-conf)
 +        client (.connect context nil "localhost" port)
 +
 +        server (Thread.
 +                (fn []
 +                  (Thread/sleep 1000)
 +                  (let [server (.bind context nil port)
 +                        iter (.recv server 0 0)
 +                        resp (.next iter)]
 +                    (is (= task (.task resp)))
 +                    (is (= req_msg (String. (.message resp))))
 +                    (.close server)
 +                  )))
 +        _ (.start server)
 +        _ (.send client task (.getBytes req_msg))
 +        ]
 +    (.close client)
 +    (.join server)
 +    (.term context)))
 +
  (deftest test-batch
-   (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+   (let [num-messages 100000
+         storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
                      STORM-MESSAGING-NETTY-AUTHENTICATION false
                      STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
                      STORM-MESSAGING-NETTY-MAX-RETRIES 10

http://git-wip-us.apache.org/repos/asf/storm/blob/c2916315/storm-core/test/clj/backtype/storm/worker_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/worker_test.clj
index 0000000,f09baef..2e0533d
mode 000000,100644..100644
--- a/storm-core/test/clj/backtype/storm/worker_test.clj
+++ b/storm-core/test/clj/backtype/storm/worker_test.clj
@@@ -1,0 -1,38 +1,37 @@@
+ ;; Licensed to the Apache Software Foundation (ASF) under one
+ ;; or more contributor license agreements.  See the NOTICE file
+ ;; distributed with this work for additional information
+ ;; regarding copyright ownership.  The ASF licenses this file
+ ;; to you under the Apache License, Version 2.0 (the
+ ;; "License"); you may not use this file except in compliance
+ ;; with the License.  You may obtain a copy of the License at
+ ;;
+ ;; http://www.apache.org/licenses/LICENSE-2.0
+ ;;
+ ;; Unless required by applicable law or agreed to in writing, software
+ ;; distributed under the License is distributed on an "AS IS" BASIS,
+ ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ;; See the License for the specific language governing permissions and
+ ;; limitations under the License.
+ (ns backtype.storm.worker-test
+   (:use [clojure test])
+   (:import [backtype.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status])
+   (:import [org.mockito Mockito])
 -  (:use [backtype.storm bootstrap testing])
++  (:use [backtype.storm testing])
+   (:use [backtype.storm.daemon common])
+ 
+   (:require [backtype.storm.daemon [worker :as worker]])
+   )
+ 
 -(bootstrap)
+ 
+ (deftest test-worker-is-connection-ready
+   (let [connection (Mockito/mock ConnectionWithStatus)]
+     (. (Mockito/when (.status connection)) thenReturn ConnectionWithStatus$Status/Ready)
+     (is (= true (worker/is-connection-ready connection)))
+ 
+     (. (Mockito/when (.status connection)) thenReturn ConnectionWithStatus$Status/Connecting)
+     (is (= false (worker/is-connection-ready connection)))
+ 
+     (. (Mockito/when (.status connection)) thenReturn ConnectionWithStatus$Status/Closed)
+     (is (= false (worker/is-connection-ready connection)))
+   ))


[2/6] storm git commit: Track how many messages are being dropped when a connection is unavailable

Posted by pt...@apache.org.
Track how many messages are being dropped when a connection is unavailable


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b60ab23c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b60ab23c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b60ab23c

Branch: refs/heads/master
Commit: b60ab23c467b7bc3d7cd1de34b3ce52126148743
Parents: 205eaf4
Author: Michael G. Noll <mn...@verisign.com>
Authored: Mon Feb 16 10:01:27 2015 +0100
Committer: Michael G. Noll <mn...@verisign.com>
Committed: Mon Feb 16 10:01:27 2015 +0100

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/messaging/netty/Client.java | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b60ab23c/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 5d99718..71aa794 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -437,6 +437,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
     private void dropPendingMessages(Iterator<TaskMessage> msgs) {
         // We consume the iterator by traversing and thus "emptying" it.
         int msgCount = iteratorSize(msgs);
+        messagesLost.getAndAdd(msgCount);
         LOG.error("dropping {} pending message(s) destined for {}", msgCount, dstAddressPrefixedName);
     }
 


[3/6] storm git commit: Clarify name of method for dropping messages

Posted by pt...@apache.org.
Clarify name of method for dropping messages


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/00613438
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/00613438
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/00613438

Branch: refs/heads/master
Commit: 006134383ed448084e358f993e34f12ba5b45dcf
Parents: b60ab23
Author: Michael G. Noll <mn...@verisign.com>
Authored: Mon Feb 16 10:03:07 2015 +0100
Committer: Michael G. Noll <mn...@verisign.com>
Committed: Mon Feb 16 10:03:07 2015 +0100

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/messaging/netty/Client.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/00613438/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 71aa794..189fa95 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -431,14 +431,14 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
      */
     private void handleMessagesWhenConnectionIsUnavailable(Iterator<TaskMessage> msgs) {
         LOG.error("connection to {} is unavailable", dstAddressPrefixedName);
-        dropPendingMessages(msgs);
+        dropMessages(msgs);
     }
 
-    private void dropPendingMessages(Iterator<TaskMessage> msgs) {
+    private void dropMessages(Iterator<TaskMessage> msgs) {
         // We consume the iterator by traversing and thus "emptying" it.
         int msgCount = iteratorSize(msgs);
         messagesLost.getAndAdd(msgCount);
-        LOG.error("dropping {} pending message(s) destined for {}", msgCount, dstAddressPrefixedName);
+        LOG.error("dropping {} message(s) destined for {}", msgCount, dstAddressPrefixedName);
     }
 
     private int iteratorSize(Iterator<TaskMessage> msgs) {


[6/6] storm git commit: fix merge mistake by removing test-server-delayed from netty_unit_test.clj

Posted by pt...@apache.org.
fix merge mistake by removing test-server-delayed from netty_unit_test.clj


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d7334849
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d7334849
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d7334849

Branch: refs/heads/master
Commit: d7334849b8d1262c7e82dc6f6e5d59d904c9ae4e
Parents: c291631
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Feb 23 16:08:29 2015 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Mon Feb 23 16:08:29 2015 -0500

----------------------------------------------------------------------
 .../storm/messaging/netty_unit_test.clj         | 30 --------------------
 1 file changed, 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d7334849/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index 51d03b1..7188d7a 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -93,36 +93,6 @@
     (.close server)
     (.term context)))
 
-(deftest test-server-delayed
-    (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
-       storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
-                    STORM-MESSAGING-NETTY-AUTHENTICATION false
-                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
-                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
-                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
-                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
-                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
-                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
-                    }
-        context (TransportFactory/makeContext storm-conf)
-        client (.connect context nil "localhost" port)
-
-        server (Thread.
-                (fn []
-                  (Thread/sleep 1000)
-                  (let [server (.bind context nil port)
-                        iter (.recv server 0 0)
-                        resp (.next iter)]
-                    (is (= task (.task resp)))
-                    (is (= req_msg (String. (.message resp))))
-                    (.close server)
-                  )))
-        _ (.start server)
-        _ (.send client task (.getBytes req_msg))
-        ]
-    (.close client)
-    (.join server)
-    (.term context)))
 
 (deftest test-batch
   (let [num-messages 100000


[4/6] storm git commit: Change log level for intentionally dropping messages from WARN to ERROR

Posted by pt...@apache.org.
Change log level for intentionally dropping messages from WARN to ERROR

This change makes the log level for dropping messages consistent in
Client.java.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/522d96e5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/522d96e5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/522d96e5

Branch: refs/heads/master
Commit: 522d96e5953037728422935d31401a58e943c688
Parents: 0061343
Author: Michael G. Noll <mn...@verisign.com>
Authored: Mon Feb 16 10:07:35 2015 +0100
Committer: Michael G. Noll <mn...@verisign.com>
Committed: Mon Feb 16 10:07:35 2015 +0100

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/messaging/netty/Client.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/522d96e5/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 189fa95..7392d3e 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -358,7 +358,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
     public synchronized void send(Iterator<TaskMessage> msgs) {
         if (closing) {
             int numMessages = iteratorSize(msgs);
-            LOG.warn("discarding {} messages because the Netty client to {} is being closed", numMessages,
+            LOG.error("discarding {} messages because the Netty client to {} is being closed", numMessages,
                 dstAddressPrefixedName);
             return;
         }
@@ -475,7 +475,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
                     messagesSent.getAndAdd(batch.size());
                 }
                 else {
-                    LOG.warn("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName,
+                    LOG.error("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName,
                         future.getCause());
                     closeChannelAndReconnect(future.getChannel());
                     messagesLost.getAndAdd(numMessages);