You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/07/14 21:06:42 UTC

[11/23] storm git commit: Log error message for dropping messages only once per connection error (logging it everytime on send was flooding the log).

Log error message for dropping messages only once per connection error (logging it everytime on send was flooding the log).


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ad8112d1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ad8112d1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ad8112d1

Branch: refs/heads/0.9.x-branch
Commit: ad8112d10d662ae81498d11f78a602b97243a142
Parents: aa5c2d7
Author: Enno Shioji <es...@gmail.com>
Authored: Sun May 31 00:54:31 2015 +0100
Committer: Enno Shioji <es...@gmail.com>
Committed: Sun May 31 00:54:31 2015 +0100

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 24 ++++++++++++++++----
 1 file changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ad8112d1/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 340d43b..187bba3 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -233,7 +233,11 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         Channel channel = channelRef.get();
         if (!connectionEstablished(channel)) {
             // Closing the channel and reconnecting should be done before handling the messages.
-            closeChannelAndReconnect(channel);
+            boolean reconnectScheduled = closeChannelAndReconnect(channel);
+            if(reconnectScheduled){
+                // Log the connection error only once
+                LOG.error("connection to {} is unavailable", dstAddressPrefixedName);
+            }
             handleMessagesWhenConnectionIsUnavailable(msgs);
             return;
         }
@@ -267,7 +271,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
      * succeed  or not, and how long the recovery will take.
      */
     private void handleMessagesWhenConnectionIsUnavailable(Iterator<TaskMessage> msgs) {
-        LOG.error("connection to {} is unavailable", dstAddressPrefixedName);
         dropMessages(msgs);
     }
 
@@ -275,7 +278,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         // We consume the iterator by traversing and thus "emptying" it.
         int msgCount = iteratorSize(msgs);
         messagesLost.getAndAdd(msgCount);
-        LOG.error("dropping {} message(s) destined for {}", msgCount, dstAddressPrefixedName);
     }
 
     private int iteratorSize(Iterator<TaskMessage> msgs) {
@@ -318,13 +320,21 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         });
     }
 
-    private void closeChannelAndReconnect(Channel channel) {
+    /**
+     * Schedule a reconnect if we closed a non-null channel, and acquired the right to
+     * provide a replacement
+     * @param channel
+     * @return if the call scheduled a re-connect task
+     */
+    private boolean closeChannelAndReconnect(Channel channel) {
         if (channel != null) {
             channel.close();
             if (channelRef.compareAndSet(channel, null)) {
                 scheduleConnect(NO_DELAY_MS);
+                return true;
             }
         }
+        return false;
     }
 
     private boolean containsMessages(MessageBatch batch) {
@@ -431,6 +441,9 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
                                 checkState(setChannel);
                                 LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(), newChannel.toString(),
                                         connectionAttempt);
+                                if(messagesLost.get() > 0){
+                                    LOG.warn("Re-connection to {} was successful but {} messages has been lost so far", address.toString(), messagesLost.get());
+                                }
                             } else {
                                 Throwable cause = future.getCause();
                                 reschedule(cause);
@@ -443,7 +456,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
                 } else {
                     close();
                     throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName + " after " +
-                            connectionAttempts + " failed attempts");
+                            connectionAttempts + " failed attempts. " + messagesLost.get() + " messages were lost");
+
                 }
             } catch (Throwable e) {
                 LOG.error("Uncaught throwable", e);