You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/07/14 21:06:34 UTC

[03/23] storm git commit: Remove background flushing because it doesn't seem necessary. Netty's Channel queues up written data on an unbounded buffer. The background flushing seems to have been added to avoid this, but in practice it was probably doing i

Remove background flushing because it doesn't seem necessary. Netty's Channel queues up written data on an unbounded buffer. The background flushing seems to have been added to avoid this, but in practice it was probably doing it anyways because flushMessages(), which is called by send() doesn't check for isWritable. Moreover, queuing on an unbounded buffer seems fine because back pressure is provided by MAX_PENDING_TUPLE. If OOME occurs due to this buffer overflowing, it seems reasonable that one has to reduce MAX_PENDING_TUPLE, rather than Storm trying to cope with it by dropping messages.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b7d84bdc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b7d84bdc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b7d84bdc

Branch: refs/heads/0.9.x-branch
Commit: b7d84bdc7fd3de34f45a94131cdbb6bfbd3763dc
Parents: 91b8eb3
Author: Enno Shioji <es...@gmail.com>
Authored: Thu May 28 22:27:31 2015 +0100
Committer: Enno Shioji <es...@gmail.com>
Committed: Thu May 28 22:27:31 2015 +0100

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 31 +-------------------
 1 file changed, 1 insertion(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b7d84bdc/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 4e97035..4bbe989 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -128,8 +128,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
     private final ListeningScheduledExecutorService scheduler;
     protected final Map stormConf;
 
-    private AtomicReference<MessageBatch> pendingMessageBatch;
-
     @SuppressWarnings("rawtypes")
     Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port) {
         closing = false;
@@ -145,8 +143,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         int maxWaitMs = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
         retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts);
 
-        pendingMessageBatch = new AtomicReference<MessageBatch>(new MessageBatch(messageBatchSize));
-
         // Initiate connection to remote destination
         bootstrap = createClientBootstrap(factory, bufferSize);
         dstAddress = new InetSocketAddress(host, port);
@@ -155,8 +151,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
 
         // Launch background flushing thread
         long initialDelayMs = Math.min(MINIMUM_INITIAL_DELAY_MS, maxWaitMs * maxReconnectionAttempts);
-        scheduler.scheduleWithFixedDelay(createBackgroundFlusher(), initialDelayMs, flushCheckIntervalMs,
-                TimeUnit.MILLISECONDS);
     }
 
     private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize) {
@@ -175,28 +169,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         return "";
     }
 
-    private Runnable createBackgroundFlusher() {
-        return new Runnable() {
-            @Override
-            public void run() {
-                if (!closing) {
-                    LOG.debug("flushing pending messages to {} in background", dstAddressPrefixedName);
-                    flushPendingMessages();
-                }
-            }
-        };
-    }
-
-    private void flushPendingMessages() {
-        Channel channel = channelRef.get();
-        if (connectionEstablished(channel)) {
-            MessageBatch toFlush = pendingMessageBatch.getAndSet(new MessageBatch(messageBatchSize));
-            flushMessages(channel, toFlush);
-        } else {
-            closeChannelAndReconnect(channel);
-        }
-    }
-
     /**
      * We will retry connection with exponential back-off policy
      */
@@ -276,7 +248,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
             return;
         }
 
-        MessageBatch toSend = pendingMessageBatch.getAndSet(new MessageBatch(messageBatchSize));
+        MessageBatch toSend = new MessageBatch(messageBatchSize);
 
         // Collect messages into batches (to optimize network throughput), then flush them.
         while (msgs.hasNext()) {
@@ -381,7 +353,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
             LOG.info("closing Netty Client {}", dstAddressPrefixedName);
             // Set closing to true to prevent any further reconnection attempts.
             closing = true;
-            flushPendingMessages();
             closeChannel();
         }
     }