You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/07/14 21:06:49 UTC
[18/23] storm git commit: Use Netty's callback to perform the
background flush
Use Netty's callback to perform the background flush
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c715e07a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c715e07a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c715e07a
Branch: refs/heads/0.9.x-branch
Commit: c715e07a47b498b8a9207dbc853ca4015167b54b
Parents: 832b5db
Author: Enno Shioji <es...@gmail.com>
Authored: Thu Jun 4 11:23:59 2015 +0100
Committer: Enno Shioji <es...@gmail.com>
Committed: Thu Jun 4 11:23:59 2015 +0100
----------------------------------------------------------------------
.../backtype/storm/messaging/netty/Client.java | 164 +++++--------------
.../storm/messaging/netty/MessageBatcher.java | 37 +++++
.../netty/StormClientErrorHandler.java | 41 -----
.../messaging/netty/StormClientHandler.java | 47 ++++++
.../netty/StormClientPipelineFactory.java | 2 +-
5 files changed, 125 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c715e07a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 0d75448..edb9f2b 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -65,6 +65,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
private static final String PREFIX = "Netty-Client-";
private static final long NO_DELAY_MS = 0L;
+ private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
private final StormBoundedExponentialBackoffRetry retryPolicy;
private final ClientBootstrap bootstrap;
@@ -113,18 +114,9 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
*/
private volatile boolean closing = false;
- /**
- * How many messages should be batched together before sending them to the remote destination.
- *
- * Messages are batched to optimize network throughput at the expense of latency.
- */
- private final int messageBatchSize;
-
private final HashedWheelTimer scheduler;
- private final Object pendingMessageLock = new Object();
- private MessageBatch pendingMessage;
- private Timeout pendingFlush;
+ private final MessageBatcher batcher;
@SuppressWarnings("rawtypes")
Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port) {
@@ -132,7 +124,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
this.scheduler = scheduler;
int bufferSize = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", host, port, bufferSize);
- messageBatchSize = Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
+ int messageBatchSize = Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
maxReconnectionAttempts = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
int minWaitMs = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
@@ -145,9 +137,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
dstAddressPrefixedName = prefixedName(dstAddress);
scheduleConnect(NO_DELAY_MS);
- // Dummy values to avoid null checks
- pendingMessage = new MessageBatch(messageBatchSize);
- pendingFlush = scheduler.newTimeout(new Flush(pendingMessage), 10, TimeUnit.MILLISECONDS);
+ batcher = new MessageBatcher(messageBatchSize);
}
private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize) {
@@ -250,90 +240,28 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
return;
}
- MessageBatch replacement = new MessageBatch(messageBatchSize);
- MessageBatch previous;
- synchronized (pendingMessageLock) {
- // pendingMessage is never null
- previous = pendingMessage;
- pendingMessage = replacement;
-
- // We are flushing the pending messages, therefore we can cancel the current pending flush
- // The cancel is idempotent
- pendingFlush.cancel();
- }
-
- // Collect messages into batches (to optimize network throughput)
- Batches batches = createBatches(previous, msgs);
- // Then flush the batches that are full
- flushMessages(channel, batches.fullBatches);
-
- if (batches.unfilled.isEmpty()) {
- // All messages ended up neatly into batches; there are no unfilled MessageBatch
- return;
+ while (msgs.hasNext()) {
+ TaskMessage message = msgs.next();
+ MessageBatch full = batcher.add(message);
+ if(full != null){
+ flushMessages(channel, full);
+ }
}
- if (channel.isWritable()) {
- // Netty's internal buffer is not full. We should write the unfilled MessageBatch immediately
- // to reduce latency
- flushMessages(channel, batches.unfilled);
+ if (!batcher.isEmpty() && channel.isWritable()) {
+ // Netty's internal buffer is not full and we still have message left in the batcher.
+ // We should write the unfilled MessageBatch immediately to reduce latency
+ MessageBatch batch = batcher.drain();
+ flushMessages(channel, batch);
} else {
// We have an unfilled MessageBatch, but Netty's internal buffer is full, meaning that we have time.
// In this situation, waiting for more messages before handing it to Netty yields better throughput
- queueUp(channel, batches.unfilled);
- }
- }
-
- private void queueUp(Channel channel, MessageBatch unfilled) {
- Batches batches;
- synchronized (pendingMessageLock) {
- batches = createBatches(pendingMessage, unfilled.getMsgs().iterator());
- pendingMessage = batches.unfilled;
-
- if(!pendingMessage.isEmpty()) {
- // We have a MessageBatch that isn't full yet, so we will wait for more messages.
- // However, we don't want to wait indefinitely so we schedule a timeout which flushes
- // this batch if it's still not flushed after a delay
-
- // First, cancel the currently pending flush, because we just saw that Netty's
- // buffer is full and thus we know we can wait longer
- pendingFlush.cancel();
-
- // Schedule the new flush
- pendingFlush = scheduler.newTimeout(new Flush(pendingMessage), 10, TimeUnit.MILLISECONDS);
- }
- }
-
- // MessageBatches that were filled are immediately handed to Netty
- flushMessages(channel, batches.fullBatches);
-
- }
-
-
- private static class Batches {
- final List<MessageBatch> fullBatches;
- final MessageBatch unfilled;
-
- private Batches(List<MessageBatch> fullBatches, MessageBatch unfilled) {
- this.fullBatches = fullBatches;
- this.unfilled = unfilled;
+ // The messages are already in the buffer, and we know that the writability was false at that point
+ // Therefore we can rely on Netty's writability change.
}
}
- private Batches createBatches(MessageBatch previous, Iterator<TaskMessage> msgs){
- List<MessageBatch> ret = new ArrayList<MessageBatch>();
- while (msgs.hasNext()) {
- TaskMessage message = msgs.next();
- previous.add(message);
- if (previous.isFull()) {
- ret.add(previous);
- previous = new MessageBatch(messageBatchSize);
- }
- }
-
- return new Batches(ret, previous);
- }
-
private Channel getConnectedChannel() {
Channel channel = channelRef.get();
if (connectionEstablished(channel)) {
@@ -375,19 +303,16 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
return size;
}
- private void flushMessages(Channel channel, List<MessageBatch> batches) {
- for (MessageBatch batch : batches) {
- flushMessages(channel, batch);
- }
- }
-
-
/**
* Asynchronously writes the message batch to the channel.
*
* If the write operation fails, then we will close the channel and trigger a reconnect.
*/
private void flushMessages(Channel channel, final MessageBatch batch) {
+ if(batch.isEmpty()){
+ return;
+ }
+
final int numMessages = batch.size();
LOG.debug("writing {} messages to channel {}", batch.size(), channel.toString());
pendingMessages.addAndGet(numMessages);
@@ -489,39 +414,30 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
* This task runs on a single thread shared among all clients, and thus
* should not perform operations that block or are expensive.
*/
- private class Flush implements TimerTask {
- private final MessageBatch instructor;
-
- private Flush(MessageBatch instructor) {
- this.instructor = instructor;
- }
-
+ private final TimerTask FLUSH = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
- MessageBatch toSend;
- MessageBatch replacement = new MessageBatch(messageBatchSize);
- synchronized (pendingMessageLock){
- if(instructor == pendingMessage){
- // It's still the batch which scheduled this timeout
- toSend = pendingMessage;
- pendingMessage = replacement;
- checkState(!toSend.isFull(), "Only unfilled batches should get timeouts scheduled");
- } else {
- // It's no longer the batch which scheduled this timeout
- // No need to work on this one
- toSend = null;
- }
- }
+ MessageBatch toSend = batcher.drain();
- if(toSend!=null){
- Channel channel = getConnectedChannel();
- if(channel == null) {
- dropMessages(toSend);
- } else {
- flushMessages(channel, toSend);
- }
+ Channel channel = getConnectedChannel();
+ if(channel == null) {
+ dropMessages(toSend);
+ } else {
+ flushMessages(channel, toSend);
}
}
+ };
+
+ /**
+ * Called by Netty thread on change in channel interest
+ * @param channel
+ */
+ public void channelInterestChanged(Channel channel) {
+ if(channel.isWritable()){
+ // Channel is writable again
+ MessageBatch pending = batcher.drain();
+ flushMessages(channel, pending);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/c715e07a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatcher.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatcher.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatcher.java
new file mode 100644
index 0000000..e724a6d
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatcher.java
@@ -0,0 +1,37 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+
+/**
+ * Enno Shioji
+ */
+public class MessageBatcher {
+ private final int mesageBatchSize;
+ private MessageBatch currentBatch;
+
+ public MessageBatcher(int mesageBatchSize){
+ this.mesageBatchSize = mesageBatchSize;
+ this.currentBatch = new MessageBatch(mesageBatchSize);
+ }
+
+ public synchronized MessageBatch add(TaskMessage msg){
+ currentBatch.add(msg);
+ if(currentBatch.isFull()){
+ MessageBatch ret = currentBatch;
+ currentBatch = new MessageBatch(mesageBatchSize);
+ return ret;
+ } else {
+ return null;
+ }
+ }
+
+ public synchronized boolean isEmpty() {
+ return currentBatch.isEmpty();
+ }
+
+ public synchronized MessageBatch drain() {
+ MessageBatch ret = currentBatch;
+ currentBatch = new MessageBatch(mesageBatchSize);
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c715e07a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientErrorHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientErrorHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientErrorHandler.java
deleted file mode 100644
index ae317aa..0000000
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientErrorHandler.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.messaging.netty;
-
-import java.net.ConnectException;
-
-import org.jboss.netty.channel.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StormClientErrorHandler extends SimpleChannelUpstreamHandler {
- private static final Logger LOG = LoggerFactory.getLogger(StormClientErrorHandler.class);
- private String name;
-
- StormClientErrorHandler(String name) {
- this.name = name;
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
- Throwable cause = event.getCause();
- if (!(cause instanceof ConnectException)) {
- LOG.info("Connection failed " + name, cause);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c715e07a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
new file mode 100644
index 0000000..b91a76d
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.net.ConnectException;
+
+import org.jboss.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StormClientHandler extends SimpleChannelUpstreamHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
+ private Client client;
+
+ StormClientHandler(Client client) {
+ this.client = client;
+ }
+
+ @Override
+ public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+
+ client.channelInterestChanged(e.getChannel());
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
+ Throwable cause = event.getCause();
+ if (!(cause instanceof ConnectException)) {
+ LOG.info("Connection failed " + client.dstAddressPrefixedName, cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c715e07a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
index 73c50a1..6bad8e3 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@ -37,7 +37,7 @@ class StormClientPipelineFactory implements ChannelPipelineFactory {
// Encoder
pipeline.addLast("encoder", new MessageEncoder());
// business logic.
- pipeline.addLast("handler", new StormClientErrorHandler(client.dstAddressPrefixedName));
+ pipeline.addLast("handler", new StormClientHandler(client));
return pipeline;
}