You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/07/14 21:06:50 UTC

[19/23] storm git commit: Clean up

Clean up


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5e134973
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5e134973
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5e134973

Branch: refs/heads/0.9.x-branch
Commit: 5e134973eadda2c76b1fd62a0f93281abf326688
Parents: c715e07
Author: Enno Shioji <es...@gmail.com>
Authored: Thu Jun 4 15:58:04 2015 +0100
Committer: Enno Shioji <es...@gmail.com>
Committed: Thu Jun 4 15:58:04 2015 +0100

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 50 +++++------------
 .../backtype/storm/messaging/netty/Context.java |  4 +-
 .../storm/messaging/netty/MessageBatch.java     | 26 +--------
 .../storm/messaging/netty/MessageBatcher.java   | 37 -------------
 .../storm/messaging/netty/MessageBuffer.java    | 58 ++++++++++++++++++++
 .../messaging/netty/StormClientHandler.java     |  3 +-
 6 files changed, 77 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5e134973/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index edb9f2b..c779733 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -65,7 +65,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
     private static final Logger LOG = LoggerFactory.getLogger(Client.class);
     private static final String PREFIX = "Netty-Client-";
     private static final long NO_DELAY_MS = 0L;
-    private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
 
     private final StormBoundedExponentialBackoffRetry retryPolicy;
     private final ClientBootstrap bootstrap;
@@ -116,7 +115,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
 
     private final HashedWheelTimer scheduler;
 
-    private final MessageBatcher batcher;
+    private final MessageBuffer batcher;
 
     @SuppressWarnings("rawtypes")
     Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port) {
@@ -136,8 +135,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         dstAddress = new InetSocketAddress(host, port);
         dstAddressPrefixedName = prefixedName(dstAddress);
         scheduleConnect(NO_DELAY_MS);
-
-        batcher = new MessageBatcher(messageBatchSize);
+        batcher = new MessageBuffer(messageBatchSize);
     }
 
     private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize) {
@@ -249,16 +247,18 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
             }
         }
 
-        if (!batcher.isEmpty() && channel.isWritable()) {
-            // Netty's internal buffer is not full and we still have message left in the batcher.
+        if(channel.isWritable()){
+            // Netty's internal buffer is not full and we still have message left in the buffer.
             // We should write the unfilled MessageBatch immediately to reduce latency
             MessageBatch batch = batcher.drain();
-            flushMessages(channel, batch);
+            if(batch != null) {
+                flushMessages(channel, batch);
+            }
         } else {
-            // We have an unfilled MessageBatch, but Netty's internal buffer is full, meaning that we have time.
-            // In this situation, waiting for more messages before handing it to Netty yields better throughput
-            // The messages are already in the buffer, and we know that the writability was false at that point
-            // Therefore we can rely on Netty's writability change.
+            // Channel's buffer is full, meaning that we have time to wait other messages to arrive, and create a bigger
+            // batch. This yields better throughput.
+            // We can rely on `notifyInterestChanged` to push these messages as soon as there is spece in Netty's buffer
+            // because we know `Channel.isWritable` was false after the messages were already in the buffer.
         }
     }
 
@@ -288,10 +288,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         messagesLost.getAndAdd(msgCount);
     }
 
-    private void dropMessages(MessageBatch msgs) {
-        messagesLost.getAndAdd(msgs.size());
-    }
-
     private int iteratorSize(Iterator<TaskMessage> msgs) {
         int size = 0;
         if (msgs != null) {
@@ -409,32 +405,12 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
     }
 
     /**
-     * Asynchronously flushes pending messages to the remote address, if they have not been
-     * flushed by other means.
-     * This task runs on a single thread shared among all clients, and thus
-     * should not perform operations that block or are expensive.
-     */
-    private final TimerTask FLUSH = new TimerTask() {
-        @Override
-        public void run(Timeout timeout) throws Exception {
-            MessageBatch toSend = batcher.drain();
-
-            Channel channel = getConnectedChannel();
-            if(channel == null) {
-                dropMessages(toSend);
-            } else {
-                flushMessages(channel, toSend);
-            }
-        }
-    };
-
-    /**
      * Called by Netty thread on change in channel interest
      * @param channel
      */
-    public void channelInterestChanged(Channel channel) {
+    public void notifyInterestChanged(Channel channel) {
         if(channel.isWritable()){
-            // Channel is writable again
+            // Channel is writable again, write if there are any messages pending
             MessageBatch pending = batcher.drain();
             flushMessages(channel, pending);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/5e134973/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 64f67ba..bce02ac 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -62,8 +62,8 @@ public class Context implements IContext {
             clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
                     Executors.newCachedThreadPool(workerFactory));
         }
-        
-        clientScheduleService = new HashedWheelTimer(new NettyRenameThreadFactory("client-schedule-timer"), 10, TimeUnit.MILLISECONDS);
+
+        clientScheduleService = new HashedWheelTimer(new NettyRenameThreadFactory("client-schedule-timer"));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/5e134973/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
index 169940f..ec0dc0f 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@ -35,32 +35,15 @@ class MessageBatch {
         encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
     }
 
-    void add(TaskMessage obj) {
-        if (obj == null)
-            throw new RuntimeException("null object forbidded in message batch");
+    void add(TaskMessage msg) {
+        if (msg == null)
+            throw new RuntimeException("null object forbidden in message batch");
 
-        TaskMessage msg = (TaskMessage)obj;
         msgs.add(msg);
         encoded_length += msgEncodeLength(msg);
     }
 
 
-    TaskMessage get(int index) {
-        return msgs.get(index);
-    }
-
-    /**
-     * try to add a TaskMessage to a batch
-     * @param taskMsg
-     * @return false if the msg could not be added due to buffer size limit; true otherwise
-     */
-    boolean tryAdd(TaskMessage taskMsg) {
-        if ((encoded_length + msgEncodeLength(taskMsg)) > buffer_size) 
-            return false;
-        add(taskMsg);
-        return true;
-    }
-
     private int msgEncodeLength(TaskMessage taskMsg) {
         if (taskMsg == null) return 0;
 
@@ -134,7 +117,4 @@ class MessageBatch {
             bout.write(message.message());
     }
 
-    public ArrayList<TaskMessage> getMsgs() {
-        return msgs;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/5e134973/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatcher.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatcher.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatcher.java
deleted file mode 100644
index e724a6d..0000000
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatcher.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import backtype.storm.messaging.TaskMessage;
-
-/**
- * Enno Shioji
- */
-public class MessageBatcher {
-    private final int mesageBatchSize;
-    private MessageBatch currentBatch;
-
-    public MessageBatcher(int mesageBatchSize){
-        this.mesageBatchSize = mesageBatchSize;
-        this.currentBatch = new MessageBatch(mesageBatchSize);
-    }
-
-    public synchronized MessageBatch add(TaskMessage msg){
-        currentBatch.add(msg);
-        if(currentBatch.isFull()){
-            MessageBatch ret = currentBatch;
-            currentBatch = new MessageBatch(mesageBatchSize);
-            return ret;
-        } else {
-            return null;
-        }
-    }
-
-    public synchronized boolean isEmpty() {
-        return currentBatch.isEmpty();
-    }
-
-    public synchronized MessageBatch drain() {
-        MessageBatch ret = currentBatch;
-        currentBatch = new MessageBatch(mesageBatchSize);
-        return ret;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/5e134973/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBuffer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBuffer.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBuffer.java
new file mode 100644
index 0000000..d485e3a
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBuffer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+
+/**
+ * Encapsulates the state used for batching up messages.
+ */
+public class MessageBuffer {
+    private final int mesageBatchSize;
+    private MessageBatch currentBatch;
+
+    public MessageBuffer(int mesageBatchSize){
+        this.mesageBatchSize = mesageBatchSize;
+        this.currentBatch = new MessageBatch(mesageBatchSize);
+    }
+
+    public synchronized MessageBatch add(TaskMessage msg){
+        currentBatch.add(msg);
+        if(currentBatch.isFull()){
+            MessageBatch ret = currentBatch;
+            currentBatch = new MessageBatch(mesageBatchSize);
+            return ret;
+        } else {
+            return null;
+        }
+    }
+
+    public synchronized boolean isEmpty() {
+        return currentBatch.isEmpty();
+    }
+
+    public synchronized MessageBatch drain() {
+        if(!currentBatch.isEmpty()) {
+            MessageBatch ret = currentBatch;
+            currentBatch = new MessageBatch(mesageBatchSize);
+            return ret;
+        } else {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5e134973/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
index b91a76d..2d25001 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
@@ -33,8 +33,7 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler  {
 
     @Override
     public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-
-        client.channelInterestChanged(e.getChannel());
+        client.notifyInterestChanged(e.getChannel());
     }
 
     @Override