You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/07/14 21:06:50 UTC
[19/23] storm git commit: Clean up
Clean up
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5e134973
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5e134973
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5e134973
Branch: refs/heads/0.9.x-branch
Commit: 5e134973eadda2c76b1fd62a0f93281abf326688
Parents: c715e07
Author: Enno Shioji <es...@gmail.com>
Authored: Thu Jun 4 15:58:04 2015 +0100
Committer: Enno Shioji <es...@gmail.com>
Committed: Thu Jun 4 15:58:04 2015 +0100
----------------------------------------------------------------------
.../backtype/storm/messaging/netty/Client.java | 50 +++++------------
.../backtype/storm/messaging/netty/Context.java | 4 +-
.../storm/messaging/netty/MessageBatch.java | 26 +--------
.../storm/messaging/netty/MessageBatcher.java | 37 -------------
.../storm/messaging/netty/MessageBuffer.java | 58 ++++++++++++++++++++
.../messaging/netty/StormClientHandler.java | 3 +-
6 files changed, 77 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5e134973/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index edb9f2b..c779733 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -65,7 +65,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
private static final String PREFIX = "Netty-Client-";
private static final long NO_DELAY_MS = 0L;
- private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
private final StormBoundedExponentialBackoffRetry retryPolicy;
private final ClientBootstrap bootstrap;
@@ -116,7 +115,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
private final HashedWheelTimer scheduler;
- private final MessageBatcher batcher;
+ private final MessageBuffer batcher;
@SuppressWarnings("rawtypes")
Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port) {
@@ -136,8 +135,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
dstAddress = new InetSocketAddress(host, port);
dstAddressPrefixedName = prefixedName(dstAddress);
scheduleConnect(NO_DELAY_MS);
-
- batcher = new MessageBatcher(messageBatchSize);
+ batcher = new MessageBuffer(messageBatchSize);
}
private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize) {
@@ -249,16 +247,18 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
}
}
- if (!batcher.isEmpty() && channel.isWritable()) {
- // Netty's internal buffer is not full and we still have message left in the batcher.
+ if(channel.isWritable()){
+ // Netty's internal buffer is not full and we still have message left in the buffer.
// We should write the unfilled MessageBatch immediately to reduce latency
MessageBatch batch = batcher.drain();
- flushMessages(channel, batch);
+ if(batch != null) {
+ flushMessages(channel, batch);
+ }
} else {
- // We have an unfilled MessageBatch, but Netty's internal buffer is full, meaning that we have time.
- // In this situation, waiting for more messages before handing it to Netty yields better throughput
- // The messages are already in the buffer, and we know that the writability was false at that point
- // Therefore we can rely on Netty's writability change.
+ // Channel's buffer is full, meaning that we have time to wait other messages to arrive, and create a bigger
+ // batch. This yields better throughput.
+ // We can rely on `notifyInterestChanged` to push these messages as soon as there is spece in Netty's buffer
+ // because we know `Channel.isWritable` was false after the messages were already in the buffer.
}
}
@@ -288,10 +288,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
messagesLost.getAndAdd(msgCount);
}
- private void dropMessages(MessageBatch msgs) {
- messagesLost.getAndAdd(msgs.size());
- }
-
private int iteratorSize(Iterator<TaskMessage> msgs) {
int size = 0;
if (msgs != null) {
@@ -409,32 +405,12 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
}
/**
- * Asynchronously flushes pending messages to the remote address, if they have not been
- * flushed by other means.
- * This task runs on a single thread shared among all clients, and thus
- * should not perform operations that block or are expensive.
- */
- private final TimerTask FLUSH = new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- MessageBatch toSend = batcher.drain();
-
- Channel channel = getConnectedChannel();
- if(channel == null) {
- dropMessages(toSend);
- } else {
- flushMessages(channel, toSend);
- }
- }
- };
-
- /**
* Called by Netty thread on change in channel interest
* @param channel
*/
- public void channelInterestChanged(Channel channel) {
+ public void notifyInterestChanged(Channel channel) {
if(channel.isWritable()){
- // Channel is writable again
+ // Channel is writable again, write if there are any messages pending
MessageBatch pending = batcher.drain();
flushMessages(channel, pending);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/5e134973/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 64f67ba..bce02ac 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -62,8 +62,8 @@ public class Context implements IContext {
clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
Executors.newCachedThreadPool(workerFactory));
}
-
- clientScheduleService = new HashedWheelTimer(new NettyRenameThreadFactory("client-schedule-timer"), 10, TimeUnit.MILLISECONDS);
+
+ clientScheduleService = new HashedWheelTimer(new NettyRenameThreadFactory("client-schedule-timer"));
}
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/5e134973/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
index 169940f..ec0dc0f 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@ -35,32 +35,15 @@ class MessageBatch {
encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
}
- void add(TaskMessage obj) {
- if (obj == null)
- throw new RuntimeException("null object forbidded in message batch");
+ void add(TaskMessage msg) {
+ if (msg == null)
+ throw new RuntimeException("null object forbidden in message batch");
- TaskMessage msg = (TaskMessage)obj;
msgs.add(msg);
encoded_length += msgEncodeLength(msg);
}
- TaskMessage get(int index) {
- return msgs.get(index);
- }
-
- /**
- * try to add a TaskMessage to a batch
- * @param taskMsg
- * @return false if the msg could not be added due to buffer size limit; true otherwise
- */
- boolean tryAdd(TaskMessage taskMsg) {
- if ((encoded_length + msgEncodeLength(taskMsg)) > buffer_size)
- return false;
- add(taskMsg);
- return true;
- }
-
private int msgEncodeLength(TaskMessage taskMsg) {
if (taskMsg == null) return 0;
@@ -134,7 +117,4 @@ class MessageBatch {
bout.write(message.message());
}
- public ArrayList<TaskMessage> getMsgs() {
- return msgs;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/5e134973/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatcher.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatcher.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatcher.java
deleted file mode 100644
index e724a6d..0000000
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatcher.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import backtype.storm.messaging.TaskMessage;
-
-/**
- * Enno Shioji
- */
-public class MessageBatcher {
- private final int mesageBatchSize;
- private MessageBatch currentBatch;
-
- public MessageBatcher(int mesageBatchSize){
- this.mesageBatchSize = mesageBatchSize;
- this.currentBatch = new MessageBatch(mesageBatchSize);
- }
-
- public synchronized MessageBatch add(TaskMessage msg){
- currentBatch.add(msg);
- if(currentBatch.isFull()){
- MessageBatch ret = currentBatch;
- currentBatch = new MessageBatch(mesageBatchSize);
- return ret;
- } else {
- return null;
- }
- }
-
- public synchronized boolean isEmpty() {
- return currentBatch.isEmpty();
- }
-
- public synchronized MessageBatch drain() {
- MessageBatch ret = currentBatch;
- currentBatch = new MessageBatch(mesageBatchSize);
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/5e134973/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBuffer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBuffer.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBuffer.java
new file mode 100644
index 0000000..d485e3a
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBuffer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+
+/**
+ * Encapsulates the state used for batching up messages.
+ */
+public class MessageBuffer {
+ private final int mesageBatchSize;
+ private MessageBatch currentBatch;
+
+ public MessageBuffer(int mesageBatchSize){
+ this.mesageBatchSize = mesageBatchSize;
+ this.currentBatch = new MessageBatch(mesageBatchSize);
+ }
+
+ public synchronized MessageBatch add(TaskMessage msg){
+ currentBatch.add(msg);
+ if(currentBatch.isFull()){
+ MessageBatch ret = currentBatch;
+ currentBatch = new MessageBatch(mesageBatchSize);
+ return ret;
+ } else {
+ return null;
+ }
+ }
+
+ public synchronized boolean isEmpty() {
+ return currentBatch.isEmpty();
+ }
+
+ public synchronized MessageBatch drain() {
+ if(!currentBatch.isEmpty()) {
+ MessageBatch ret = currentBatch;
+ currentBatch = new MessageBatch(mesageBatchSize);
+ return ret;
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5e134973/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
index b91a76d..2d25001 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
@@ -33,8 +33,7 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
@Override
public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-
- client.channelInterestChanged(e.getChannel());
+ client.notifyInterestChanged(e.getChannel());
}
@Override