You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/11/23 22:07:49 UTC
[10/37] storm git commit: Fix spacing with Client
Fix spacing with Client
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/33903de8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/33903de8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/33903de8
Branch: refs/heads/master
Commit: 33903de82d30021885e73e16a2d479c504e0b163
Parents: 34b1373
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Fri Oct 30 17:48:00 2015 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Fri Oct 30 17:48:00 2015 -0500
----------------------------------------------------------------------
.../backtype/storm/messaging/netty/Client.java | 96 ++++++++++----------
1 file changed, 48 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/33903de8/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 7ecd770..a23b699 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -217,7 +217,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
@Override
public Iterator<TaskMessage> recv(int flags, int clientId) {
throw new UnsupportedOperationException("Client connection should not receive any messages");
- }
+ }
@Override
public void send(int taskId, byte[] payload) {
@@ -235,12 +235,12 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
if (closing) {
int numMessages = iteratorSize(msgs);
LOG.error("discarding {} messages because the Netty client to {} is being closed", numMessages,
- dstAddressPrefixedName);
+ dstAddressPrefixedName);
return;
}
if (!hasMessages(msgs)) {
- return;
+ return;
}
Channel channel = getConnectedChannel();
@@ -281,7 +281,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
// We can rely on `notifyInterestChanged` to push these messages as soon as there is spece in Netty's buffer
// because we know `Channel.isWritable` was false after the messages were already in the buffer.
}
- }
+ }
private Channel getConnectedChannel() {
Channel channel = channelRef.get();
@@ -296,7 +296,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
}
return null;
}
- }
+ }
public InetSocketAddress getDstAddress() {
return dstAddress;
@@ -311,7 +311,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
// We consume the iterator by traversing and thus "emptying" it.
int msgCount = iteratorSize(msgs);
messagesLost.getAndAdd(msgCount);
- }
+ }
private int iteratorSize(Iterator<TaskMessage> msgs) {
int size = 0;
@@ -319,8 +319,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
while (msgs.hasNext()) {
size++;
msgs.next();
- }
}
+ }
return size;
}
@@ -340,21 +340,21 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
ChannelFuture future = channel.write(batch);
future.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future) throws Exception {
- pendingMessages.addAndGet(0 - numMessages);
- if (future.isSuccess()) {
- LOG.debug("sent {} messages to {}", numMessages, dstAddressPrefixedName);
- messagesSent.getAndAdd(batch.size());
- } else {
- LOG.error("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName,
- future.getCause());
- closeChannelAndReconnect(future.getChannel());
- messagesLost.getAndAdd(numMessages);
+ public void operationComplete(ChannelFuture future) throws Exception {
+ pendingMessages.addAndGet(0 - numMessages);
+ if (future.isSuccess()) {
+ LOG.debug("sent {} messages to {}", numMessages, dstAddressPrefixedName);
+ messagesSent.getAndAdd(batch.size());
+ } else {
+ LOG.error("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName,
+ future.getCause());
+ closeChannelAndReconnect(future.getChannel());
+ messagesLost.getAndAdd(numMessages);
+ }
}
- }
- });
- }
+ });
+ }
/**
* Schedule a reconnect if we closed a non-null channel, and acquired the right to
@@ -390,23 +390,23 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
private void waitForPendingMessagesToBeSent() {
LOG.info("waiting up to {} ms to send {} pending messages to {}",
- PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), dstAddressPrefixedName);
+ PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), dstAddressPrefixedName);
long totalPendingMsgs = pendingMessages.get();
long startMs = System.currentTimeMillis();
while (pendingMessages.get() != 0) {
- try {
+ try {
long deltaMs = System.currentTimeMillis() - startMs;
if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
LOG.error("failed to send all pending messages to {} within timeout, {} of {} messages were not " +
- "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs);
+ "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs);
break;
}
Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
}
catch (InterruptedException e) {
break;
+ }
}
- }
}
@@ -441,7 +441,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
/** ISaslClient interface **/
public void channelConnected(Channel channel) {
// setChannel(channel);
- }
+ }
public void channelReady() {
saslChannelReady.set(true);
@@ -502,7 +502,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
private void reschedule(Throwable t) {
String baseMsg = String.format("connection attempt %s to %s failed", connectionAttempts,
- dstAddressPrefixedName);
+ dstAddressPrefixedName);
String failureMsg = (t == null) ? baseMsg : baseMsg + ": " + t.toString();
LOG.error(failureMsg);
long nextDelayMs = retryPolicy.getSleepTimeMs(connectionAttempts.get(), 0);
@@ -519,34 +519,34 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt);
ChannelFuture future = bootstrap.connect(address);
future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- // This call returns immediately
- Channel newChannel = future.getChannel();
-
- if (future.isSuccess() && connectionEstablished(newChannel)) {
- boolean setChannel = channelRef.compareAndSet(null, newChannel);
- checkState(setChannel);
- LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(), newChannel.toString(),
- connectionAttempt);
- if (messagesLost.get() > 0) {
- LOG.warn("Re-connection to {} was successful but {} messages has been lost so far", address.toString(), messagesLost.get());
- }
- } else {
- Throwable cause = future.getCause();
- reschedule(cause);
- if (newChannel != null) {
- newChannel.close();
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ // This call returns immediately
+ Channel newChannel = future.getChannel();
+
+ if (future.isSuccess() && connectionEstablished(newChannel)) {
+ boolean setChannel = channelRef.compareAndSet(null, newChannel);
+ checkState(setChannel);
+ LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(), newChannel.toString(),
+ connectionAttempt);
+ if (messagesLost.get() > 0) {
+ LOG.warn("Re-connection to {} was successful but {} messages has been lost so far", address.toString(), messagesLost.get());
+ }
+ } else {
+ Throwable cause = future.getCause();
+ reschedule(cause);
+ if (newChannel != null) {
+ newChannel.close();
+ }
}
}
- }
- });
+ });
} else {
close();
throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName + " after " +
- connectionAttempts + " failed attempts. " + messagesLost.get() + " messages were lost");
+ connectionAttempts + " failed attempts. " + messagesLost.get() + " messages were lost");
- }
+ }
}
}
}