You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/07/14 21:06:49 UTC

[18/23] storm git commit: Use Netty's callback to perform the background flush

Use Netty's callback to perform the background flush


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c715e07a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c715e07a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c715e07a

Branch: refs/heads/0.9.x-branch
Commit: c715e07a47b498b8a9207dbc853ca4015167b54b
Parents: 832b5db
Author: Enno Shioji <es...@gmail.com>
Authored: Thu Jun 4 11:23:59 2015 +0100
Committer: Enno Shioji <es...@gmail.com>
Committed: Thu Jun 4 11:23:59 2015 +0100

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 164 +++++--------------
 .../storm/messaging/netty/MessageBatcher.java   |  37 +++++
 .../netty/StormClientErrorHandler.java          |  41 -----
 .../messaging/netty/StormClientHandler.java     |  47 ++++++
 .../netty/StormClientPipelineFactory.java       |   2 +-
 5 files changed, 125 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c715e07a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 0d75448..edb9f2b 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -65,6 +65,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
     private static final Logger LOG = LoggerFactory.getLogger(Client.class);
     private static final String PREFIX = "Netty-Client-";
     private static final long NO_DELAY_MS = 0L;
+    private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
 
     private final StormBoundedExponentialBackoffRetry retryPolicy;
     private final ClientBootstrap bootstrap;
@@ -113,18 +114,9 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
      */
     private volatile boolean closing = false;
 
-    /**
-     * How many messages should be batched together before sending them to the remote destination.
-     *
-     * Messages are batched to optimize network throughput at the expense of latency.
-     */
-    private final int messageBatchSize;
-
     private final HashedWheelTimer scheduler;
 
-    private final Object pendingMessageLock = new Object();
-    private MessageBatch pendingMessage;
-    private Timeout pendingFlush;
+    private final MessageBatcher batcher;
 
     @SuppressWarnings("rawtypes")
     Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port) {
@@ -132,7 +124,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         this.scheduler = scheduler;
         int bufferSize = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
         LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", host, port, bufferSize);
-        messageBatchSize = Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
+        int messageBatchSize = Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
 
         maxReconnectionAttempts = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
         int minWaitMs = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
@@ -145,9 +137,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         dstAddressPrefixedName = prefixedName(dstAddress);
         scheduleConnect(NO_DELAY_MS);
 
-        // Dummy values to avoid null checks
-        pendingMessage = new MessageBatch(messageBatchSize);
-        pendingFlush = scheduler.newTimeout(new Flush(pendingMessage), 10, TimeUnit.MILLISECONDS);
+        batcher = new MessageBatcher(messageBatchSize);
     }
 
     private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize) {
@@ -250,90 +240,28 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
             return;
         }
 
-        MessageBatch replacement = new MessageBatch(messageBatchSize);
-        MessageBatch previous;
-        synchronized (pendingMessageLock) {
-            // pendingMessage is never null
-            previous = pendingMessage;
-            pendingMessage = replacement;
-
-            // We are flushing the pending messages, therefore we can cancel the current pending flush
-            // The cancel is idempotent
-            pendingFlush.cancel();
-        }
-
-        // Collect messages into batches (to optimize network throughput)
-        Batches batches = createBatches(previous, msgs);
 
-        // Then flush the batches that are full
-        flushMessages(channel, batches.fullBatches);
-
-        if (batches.unfilled.isEmpty()) {
-            // All messages ended up neatly into batches; there are no unfilled MessageBatch
-            return;
+        while (msgs.hasNext()) {
+            TaskMessage message = msgs.next();
+            MessageBatch full = batcher.add(message);
+            if(full != null){
+                flushMessages(channel, full);
+            }
         }
 
-        if (channel.isWritable()) {
-            // Netty's internal buffer is not full. We should write the unfilled MessageBatch immediately
-            // to reduce latency
-            flushMessages(channel, batches.unfilled);
+        if (!batcher.isEmpty() && channel.isWritable()) {
+            // Netty's internal buffer is not full and we still have message left in the batcher.
+            // We should write the unfilled MessageBatch immediately to reduce latency
+            MessageBatch batch = batcher.drain();
+            flushMessages(channel, batch);
         } else {
             // We have an unfilled MessageBatch, but Netty's internal buffer is full, meaning that we have time.
             // In this situation, waiting for more messages before handing it to Netty yields better throughput
-            queueUp(channel, batches.unfilled);
-        }
-    }
-
-    private void queueUp(Channel channel, MessageBatch unfilled) {
-        Batches batches;
-        synchronized (pendingMessageLock) {
-            batches = createBatches(pendingMessage, unfilled.getMsgs().iterator());
-            pendingMessage = batches.unfilled;
-
-            if(!pendingMessage.isEmpty()) {
-                // We have a MessageBatch that isn't full yet, so we will wait for more messages.
-                // However, we don't want to wait indefinitely so we schedule a timeout which flushes
-                // this batch if it's still not flushed after a delay
-
-                // First, cancel the currently pending flush, because we just saw that Netty's
-                // buffer is full and thus we know we can wait longer
-                pendingFlush.cancel();
-
-                // Schedule the new flush
-                pendingFlush = scheduler.newTimeout(new Flush(pendingMessage), 10, TimeUnit.MILLISECONDS);
-            }
-        }
-
-        // MessageBatches that were filled are immediately handed to Netty
-        flushMessages(channel, batches.fullBatches);
-
-    }
-
-
-    private static class Batches {
-        final List<MessageBatch> fullBatches;
-        final MessageBatch unfilled;
-
-        private Batches(List<MessageBatch> fullBatches, MessageBatch unfilled) {
-            this.fullBatches = fullBatches;
-            this.unfilled = unfilled;
+            // The messages are already in the buffer, and we know that the writability was false at that point
+            // Therefore we can rely on Netty's writability change.
         }
     }
 
-    private Batches createBatches(MessageBatch previous, Iterator<TaskMessage> msgs){
-        List<MessageBatch> ret = new ArrayList<MessageBatch>();
-        while (msgs.hasNext()) {
-            TaskMessage message = msgs.next();
-            previous.add(message);
-            if (previous.isFull()) {
-                ret.add(previous);
-                previous = new MessageBatch(messageBatchSize);
-            }
-        }
-
-        return new Batches(ret, previous);
-    }
-
     private Channel getConnectedChannel() {
         Channel channel = channelRef.get();
         if (connectionEstablished(channel)) {
@@ -375,19 +303,16 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         return size;
     }
 
-    private void flushMessages(Channel channel, List<MessageBatch> batches) {
-        for (MessageBatch batch : batches) {
-            flushMessages(channel, batch);
-        }
-    }
-
-
     /**
      * Asynchronously writes the message batch to the channel.
      *
      * If the write operation fails, then we will close the channel and trigger a reconnect.
      */
     private void flushMessages(Channel channel, final MessageBatch batch) {
+        if(batch.isEmpty()){
+            return;
+        }
+        
         final int numMessages = batch.size();
         LOG.debug("writing {} messages to channel {}", batch.size(), channel.toString());
         pendingMessages.addAndGet(numMessages);
@@ -489,39 +414,30 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
      * This task runs on a single thread shared among all clients, and thus
      * should not perform operations that block or are expensive.
      */
-    private class Flush implements TimerTask {
-        private final MessageBatch instructor;
-
-        private Flush(MessageBatch instructor) {
-            this.instructor = instructor;
-        }
-
+    private final TimerTask FLUSH = new TimerTask() {
         @Override
         public void run(Timeout timeout) throws Exception {
-            MessageBatch toSend;
-            MessageBatch replacement = new MessageBatch(messageBatchSize);
-            synchronized (pendingMessageLock){
-                if(instructor == pendingMessage){
-                    // It's still the batch which scheduled this timeout
-                    toSend = pendingMessage;
-                    pendingMessage = replacement;
-                    checkState(!toSend.isFull(), "Only unfilled batches should get timeouts scheduled");
-                } else {
-                    // It's no longer the batch which scheduled this timeout
-                    // No need to work on this one
-                    toSend = null;
-                }
-            }
+            MessageBatch toSend = batcher.drain();
 
-            if(toSend!=null){
-                Channel channel = getConnectedChannel();
-                if(channel == null) {
-                    dropMessages(toSend);
-                } else {
-                    flushMessages(channel, toSend);
-                }
+            Channel channel = getConnectedChannel();
+            if(channel == null) {
+                dropMessages(toSend);
+            } else {
+                flushMessages(channel, toSend);
             }
         }
+    };
+
+    /**
+     * Called by Netty thread on change in channel interest
+     * @param channel
+     */
+    public void channelInterestChanged(Channel channel) {
+        if(channel.isWritable()){
+            // Channel is writable again
+            MessageBatch pending = batcher.drain();
+            flushMessages(channel, pending);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/c715e07a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatcher.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatcher.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatcher.java
new file mode 100644
index 0000000..e724a6d
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatcher.java
@@ -0,0 +1,37 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+
+/**
+ * Enno Shioji
+ */
+public class MessageBatcher {
+    private final int mesageBatchSize;
+    private MessageBatch currentBatch;
+
+    public MessageBatcher(int mesageBatchSize){
+        this.mesageBatchSize = mesageBatchSize;
+        this.currentBatch = new MessageBatch(mesageBatchSize);
+    }
+
+    public synchronized MessageBatch add(TaskMessage msg){
+        currentBatch.add(msg);
+        if(currentBatch.isFull()){
+            MessageBatch ret = currentBatch;
+            currentBatch = new MessageBatch(mesageBatchSize);
+            return ret;
+        } else {
+            return null;
+        }
+    }
+
+    public synchronized boolean isEmpty() {
+        return currentBatch.isEmpty();
+    }
+
+    public synchronized MessageBatch drain() {
+        MessageBatch ret = currentBatch;
+        currentBatch = new MessageBatch(mesageBatchSize);
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c715e07a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientErrorHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientErrorHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientErrorHandler.java
deleted file mode 100644
index ae317aa..0000000
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientErrorHandler.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.messaging.netty;
-
-import java.net.ConnectException;
-
-import org.jboss.netty.channel.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StormClientErrorHandler extends SimpleChannelUpstreamHandler  {
-    private static final Logger LOG = LoggerFactory.getLogger(StormClientErrorHandler.class);
-    private String name;
-    
-    StormClientErrorHandler(String name) {
-        this.name = name;
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
-        Throwable cause = event.getCause();
-        if (!(cause instanceof ConnectException)) {
-            LOG.info("Connection failed " + name, cause);
-        } 
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c715e07a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
new file mode 100644
index 0000000..b91a76d
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.net.ConnectException;
+
+import org.jboss.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StormClientHandler extends SimpleChannelUpstreamHandler  {
+    private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
+    private Client client;
+    
+    StormClientHandler(Client client) {
+        this.client = client;
+    }
+
+    @Override
+    public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+
+        client.channelInterestChanged(e.getChannel());
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
+        Throwable cause = event.getCause();
+        if (!(cause instanceof ConnectException)) {
+            LOG.info("Connection failed " + client.dstAddressPrefixedName, cause);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c715e07a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
index 73c50a1..6bad8e3 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@ -37,7 +37,7 @@ class StormClientPipelineFactory implements ChannelPipelineFactory {
         // Encoder
         pipeline.addLast("encoder", new MessageEncoder());
         // business logic.
-        pipeline.addLast("handler", new StormClientErrorHandler(client.dstAddressPrefixedName));
+        pipeline.addLast("handler", new StormClientHandler(client));
 
         return pipeline;
     }