You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/11/23 22:07:49 UTC

[10/37] storm git commit: Fix spacing with Client

Fix spacing with Client


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/33903de8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/33903de8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/33903de8

Branch: refs/heads/master
Commit: 33903de82d30021885e73e16a2d479c504e0b163
Parents: 34b1373
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Fri Oct 30 17:48:00 2015 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Fri Oct 30 17:48:00 2015 -0500

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 96 ++++++++++----------
 1 file changed, 48 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/33903de8/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 7ecd770..a23b699 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -217,7 +217,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     @Override
     public Iterator<TaskMessage> recv(int flags, int clientId) {
         throw new UnsupportedOperationException("Client connection should not receive any messages");
-        }
+    }
 
     @Override
     public void send(int taskId, byte[] payload) {
@@ -235,12 +235,12 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         if (closing) {
             int numMessages = iteratorSize(msgs);
             LOG.error("discarding {} messages because the Netty client to {} is being closed", numMessages,
-                    dstAddressPrefixedName);
+                      dstAddressPrefixedName);
             return;
         }
 
         if (!hasMessages(msgs)) {
-          return;
+            return;
         }
 
         Channel channel = getConnectedChannel();
@@ -281,7 +281,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
             // We can rely on `notifyInterestChanged` to push these messages as soon as there is spece in Netty's buffer
             // because we know `Channel.isWritable` was false after the messages were already in the buffer.
         }
-        }
+    }
 
     private Channel getConnectedChannel() {
         Channel channel = channelRef.get();
@@ -296,7 +296,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
             }
             return null;
         }
-        }
+    }
 
     public InetSocketAddress getDstAddress() {
         return dstAddress;
@@ -311,7 +311,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         // We consume the iterator by traversing and thus "emptying" it.
         int msgCount = iteratorSize(msgs);
         messagesLost.getAndAdd(msgCount);
-                    }
+    }
 
     private int iteratorSize(Iterator<TaskMessage> msgs) {
         int size = 0;
@@ -319,8 +319,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
             while (msgs.hasNext()) {
                 size++;
                 msgs.next();
-                }
             }
+        }
         return size;
     }
 
@@ -340,21 +340,21 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
 
         ChannelFuture future = channel.write(batch);
         future.addListener(new ChannelFutureListener() {
-            public void operationComplete(ChannelFuture future) throws Exception {
-                pendingMessages.addAndGet(0 - numMessages);
-                if (future.isSuccess()) {
-                    LOG.debug("sent {} messages to {}", numMessages, dstAddressPrefixedName);
-                    messagesSent.getAndAdd(batch.size());
-                } else {
-                    LOG.error("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName,
-                            future.getCause());
-                    closeChannelAndReconnect(future.getChannel());
-                    messagesLost.getAndAdd(numMessages);
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    pendingMessages.addAndGet(0 - numMessages);
+                    if (future.isSuccess()) {
+                        LOG.debug("sent {} messages to {}", numMessages, dstAddressPrefixedName);
+                        messagesSent.getAndAdd(batch.size());
+                    } else {
+                        LOG.error("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName,
+                                  future.getCause());
+                        closeChannelAndReconnect(future.getChannel());
+                        messagesLost.getAndAdd(numMessages);
+                    }
                 }
-            }
 
-        });
-        }
+            });
+    }
 
     /**
      * Schedule a reconnect if we closed a non-null channel, and acquired the right to
@@ -390,23 +390,23 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
 
     private void waitForPendingMessagesToBeSent() {
         LOG.info("waiting up to {} ms to send {} pending messages to {}",
-                PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), dstAddressPrefixedName);
+                 PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), dstAddressPrefixedName);
         long totalPendingMsgs = pendingMessages.get();
         long startMs = System.currentTimeMillis();
         while (pendingMessages.get() != 0) {
-        try {
+            try {
                 long deltaMs = System.currentTimeMillis() - startMs;
                 if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
                     LOG.error("failed to send all pending messages to {} within timeout, {} of {} messages were not " +
-                            "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs);
+                              "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs);
                     break;
                 }
                 Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
             }
             catch (InterruptedException e) {
                 break;
+            }
         }
-    }
 
     }
 
@@ -441,7 +441,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     /** ISaslClient interface **/
     public void channelConnected(Channel channel) {
 //        setChannel(channel);
-        }
+    }
 
     public void channelReady() {
         saslChannelReady.set(true);
@@ -502,7 +502,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
 
         private void reschedule(Throwable t) {
             String baseMsg = String.format("connection attempt %s to %s failed", connectionAttempts,
-                    dstAddressPrefixedName);
+                                           dstAddressPrefixedName);
             String failureMsg = (t == null) ? baseMsg : baseMsg + ": " + t.toString();
             LOG.error(failureMsg);
             long nextDelayMs = retryPolicy.getSleepTimeMs(connectionAttempts.get(), 0);
@@ -519,34 +519,34 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
                 LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt);
                 ChannelFuture future = bootstrap.connect(address);
                 future.addListener(new ChannelFutureListener() {
-                    @Override
-                    public void operationComplete(ChannelFuture future) throws Exception {
-                        // This call returns immediately
-                        Channel newChannel = future.getChannel();
-
-                        if (future.isSuccess() && connectionEstablished(newChannel)) {
-                            boolean setChannel = channelRef.compareAndSet(null, newChannel);
-                            checkState(setChannel);
-                            LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(), newChannel.toString(),
-                                    connectionAttempt);
-                            if (messagesLost.get() > 0) {
-                                LOG.warn("Re-connection to {} was successful but {} messages has been lost so far", address.toString(), messagesLost.get());
-    }
-                        } else {
-                            Throwable cause = future.getCause();
-                            reschedule(cause);
-                            if (newChannel != null) {
-                                newChannel.close();
+                        @Override
+                        public void operationComplete(ChannelFuture future) throws Exception {
+                            // This call returns immediately
+                            Channel newChannel = future.getChannel();
+
+                            if (future.isSuccess() && connectionEstablished(newChannel)) {
+                                boolean setChannel = channelRef.compareAndSet(null, newChannel);
+                                checkState(setChannel);
+                                LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(), newChannel.toString(),
+                                          connectionAttempt);
+                                if (messagesLost.get() > 0) {
+                                    LOG.warn("Re-connection to {} was successful but {} messages has been lost so far", address.toString(), messagesLost.get());
+                                }
+                            } else {
+                                Throwable cause = future.getCause();
+                                reschedule(cause);
+                                if (newChannel != null) {
+                                    newChannel.close();
+                                }
                             }
                         }
-                    }
-                });
+                    });
             } else {
                 close();
                 throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName + " after " +
-                        connectionAttempts + " failed attempts. " + messagesLost.get() + " messages were lost");
+                                           connectionAttempts + " failed attempts. " + messagesLost.get() + " messages were lost");
 
-    }
+            }
         }
     }
 }