You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/11 18:12:19 UTC
[1/3] git commit: STORM-12 Reduce thread usage of Netty transport.
Repository: incubator-storm
Updated Branches:
refs/heads/master 1be3d0f65 -> 1a0b46e95
STORM-12 Reduce thread usage of Netty transport.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/94c4d4d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/94c4d4d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/94c4d4d9
Branch: refs/heads/master
Commit: 94c4d4d9e6c4ce736141668c585818214a9d26cf
Parents: c621a6c
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Apr 1 15:32:31 2014 +0000
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Apr 1 15:32:31 2014 +0000
----------------------------------------------------------------------
.../backtype/storm/messaging/netty/Client.java | 184 +++++++++++++------
.../backtype/storm/messaging/netty/Context.java | 30 ++-
.../storm/messaging/netty/MessageBatch.java | 55 +-----
.../backtype/storm/messaging/netty/Server.java | 2 +-
.../messaging/netty/StormClientHandler.java | 80 +++-----
5 files changed, 183 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/94c4d4d9/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index f15cd1d..6996b49 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -21,17 +21,21 @@ import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.Utils;
+
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.Random;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -39,38 +43,37 @@ import java.util.concurrent.atomic.AtomicReference;
class Client implements IConnection {
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
+ private static final Timer TIMER = new Timer("netty-client-timer", true);
+
private final int max_retries;
private final long base_sleep_ms;
private final long max_sleep_ms;
private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
private AtomicReference<Channel> channelRef;
private final ClientBootstrap bootstrap;
- private InetSocketAddress remote_addr;
+ InetSocketAddress remote_addr;
private AtomicInteger retries;
private final Random random = new Random();
private final ChannelFactory factory;
private final int buffer_size;
private final AtomicBoolean being_closed;
+ private boolean wait_for_requests;
@SuppressWarnings("rawtypes")
- Client(Map storm_conf, String host, int port) {
+ Client(Map storm_conf, ChannelFactory factory, String host, int port) {
+ this.factory = factory;
message_queue = new LinkedBlockingQueue<Object>();
retries = new AtomicInteger(0);
channelRef = new AtomicReference<Channel>(null);
being_closed = new AtomicBoolean(false);
+ wait_for_requests = false;
// Configure
buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
- int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
- if (maxWorkers > 0) {
- factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
- } else {
- factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
- }
bootstrap = new ClientBootstrap(factory);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("sendBufferSize", buffer_size);
@@ -88,19 +91,24 @@ class Client implements IConnection {
* We will retry connection with exponential back-off policy
*/
void reconnect() {
- try {
- int tried_count = retries.incrementAndGet();
- if (tried_count <= max_retries) {
- Thread.sleep(getSleepTimeMs());
- LOG.info("Reconnect ... [{}]", tried_count);
- bootstrap.connect(remote_addr);
- LOG.debug("connection started...");
- } else {
- LOG.warn("Remote address is not reachable. We will close this client.");
- close();
- }
- } catch (InterruptedException e) {
- LOG.warn("connection failed", e);
+ close_n_release();
+
+ //reconnect only if it's not being closed
+ if (being_closed.get()) return;
+
+ final int tried_count = retries.incrementAndGet();
+ if (tried_count <= max_retries) {
+ long sleep = getSleepTimeMs();
+ LOG.info("Waiting {} ms before trying connection to {}", sleep, remote_addr);
+ TIMER.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ LOG.info("Reconnect ... [{}] to {}", tried_count, remote_addr);
+ bootstrap.connect(remote_addr);
+ }}, sleep);
+ } else {
+ LOG.warn(remote_addr+" is not reachable. We will close this client.");
+ close();
}
}
@@ -130,36 +138,94 @@ class Client implements IConnection {
try {
message_queue.put(new TaskMessage(task, message));
+
+ //resume delivery if it is waiting for requests
+ tryDeliverMessages(true);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
+ * Retrieve messages from queue, and delivery to server if any
+ */
+ synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
+ //just skip if delivery only if waiting, and we are not waiting currently
+ if (only_if_waiting && !wait_for_requests) return;
+
+ //make sure that channel was not closed
+ Channel channel = channelRef.get();
+ if (channel == null) return;
+ if (!channel.isOpen()) {
+ LOG.info("Channel to {} is no longer open.",remote_addr);
+ //The channel is not open yet. Reconnect?
+ reconnect();
+ return;
+ }
+
+ final MessageBatch requests = tryTakeMessages();
+ if (requests==null) {
+ wait_for_requests = true;
+ return;
+ }
+
+ //if channel is being closed and we have no outstanding messages, let's close the channel
+ if (requests.isEmpty() && being_closed.get()) {
+ close_n_release();
+ return;
+ }
+
+ //we are busily delivering messages, and will check queue upon response.
+ //When send() is called by senders, we should not thus call tryDeliverMessages().
+ wait_for_requests = false;
+
+ //write request into socket channel
+ ChannelFuture future = channel.write(requests);
+ future.addListener(new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture future)
+ throws Exception {
+ if (!future.isSuccess()) {
+ LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause());
+ reconnect();
+ } else {
+ LOG.debug("{} request(s) sent", requests.size());
+
+ //Now that our requests have been sent, channel could be closed if needed
+ if (being_closed.get())
+ close_n_release();
+ }
+ }
+ });
+ }
+
+ /**
* Take all enqueued messages from queue
- * @return
+ * @return batch of messages
* @throws InterruptedException
+ *
+ * synchronized ... ensure that messages are delivered in the same order
+ * as they are added into queue
*/
- MessageBatch takeMessages() throws InterruptedException {
+ private MessageBatch tryTakeMessages() throws InterruptedException {
//1st message
- MessageBatch batch = new MessageBatch(buffer_size);
- Object msg = message_queue.take();
- batch.add(msg);
+ Object msg = message_queue.poll();
+ if (msg == null) return null;
+ MessageBatch batch = new MessageBatch(buffer_size);
//we will discard any message after CLOSE
- if (msg==ControlMessage.CLOSE_MESSAGE)
+ if (msg == ControlMessage.CLOSE_MESSAGE) {
+ LOG.info("Connection to {} is being closed", remote_addr);
+ being_closed.set(true);
return batch;
+ }
- while (!batch.isFull()) {
- //peek the next message
- msg = message_queue.peek();
- //no more messages
- if (msg == null) break;
-
- //we will discard any message after CLOSE
- if (msg==ControlMessage.CLOSE_MESSAGE) {
+ batch.add((TaskMessage)msg);
+ while (!batch.isFull() && ((msg = message_queue.peek())!=null)) {
+ //Is it a CLOSE message?
+ if (msg == ControlMessage.CLOSE_MESSAGE) {
message_queue.take();
- batch.add(msg);
+ LOG.info("Connection to {} is being closed", remote_addr);
+ being_closed.set(true);
break;
}
@@ -179,31 +245,29 @@ class Client implements IConnection {
*
* We will send all existing requests, and then invoke close_n_release() method
*/
- public synchronized void close() {
- if (!being_closed.get()) {
- //enqueue a CLOSE message so that shutdown() will be invoked
- try {
- message_queue.put(ControlMessage.CLOSE_MESSAGE);
- being_closed.set(true);
- } catch (InterruptedException e) {
- close_n_release();
- }
+ public void close() {
+ //enqueue a CLOSE message so that shutdown() will be invoked
+ try {
+ message_queue.put(ControlMessage.CLOSE_MESSAGE);
+
+ //resume delivery if it is waiting for requests
+ tryDeliverMessages(true);
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted Connection to {} is being closed", remote_addr);
+ being_closed.set(true);
+ close_n_release();
}
}
/**
* close_n_release() is invoked after all messages have been sent.
*/
- void close_n_release() {
- if (channelRef.get() != null)
- channelRef.get().close().awaitUninterruptibly();
-
- //we need to release resources
- new Thread(new Runnable() {
- @Override
- public void run() {
- factory.releaseExternalResources();
- }}).start();
+ synchronized void close_n_release() {
+ if (channelRef.get() != null) {
+ channelRef.get().close();
+ LOG.debug("channel {} closed",remote_addr);
+ setChannel(null);
+ }
}
public TaskMessage recv(int flags) {
@@ -211,6 +275,10 @@ class Client implements IConnection {
}
void setChannel(Channel channel) {
+ if (channel != null && channel.isOpen()) {
+ //Assume the most recent connection attempt was successful.
+ retries.set(0);
+ }
channelRef.set(channel);
//reset retries
if (channel != null)
@@ -218,7 +286,3 @@ class Client implements IConnection {
}
}
-
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/94c4d4d9/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 3e09dd1..80b4443 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -17,8 +17,16 @@
*/
package backtype.storm.messaging.netty;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+import java.util.concurrent.Executors;
+import java.util.Map;
+import java.util.Vector;
+
+import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.IContext;
+import backtype.storm.utils.Utils;
import java.util.Map;
import java.util.Vector;
@@ -27,14 +35,25 @@ public class Context implements IContext {
@SuppressWarnings("rawtypes")
private Map storm_conf;
private volatile Vector<IConnection> connections;
-
+ private NioClientSocketChannelFactory clientChannelFactory;
+
/**
* initialization per Storm configuration
*/
@SuppressWarnings("rawtypes")
public void prepare(Map storm_conf) {
- this.storm_conf = storm_conf;
- connections = new Vector<IConnection>();
+ this.storm_conf = storm_conf;
+ connections = new Vector<IConnection>();
+
+ //each context will have a single client channel factory
+ int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
+ if (maxWorkers > 0) {
+ clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool(), maxWorkers);
+ } else {
+ clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool());
+ }
}
/**
@@ -50,7 +69,7 @@ public class Context implements IContext {
* establish a connection to a remote server
*/
public IConnection connect(String storm_id, String host, int port) {
- IConnection client = new Client(storm_conf, host, port);
+ IConnection client = new Client(storm_conf, clientChannelFactory, host, port);
connections.add(client);
return client;
}
@@ -63,5 +82,8 @@ public class Context implements IContext {
conn.close();
}
connections = null;
+
+ //we need to release resources associated with client channel factory
+ clientChannelFactory.releaseExternalResources();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/94c4d4d9/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
index 9d287e4..cd8d4e3 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@ -26,56 +26,22 @@ import java.util.ArrayList;
class MessageBatch {
private int buffer_size;
- private ArrayList<Object> msgs;
+ private ArrayList<TaskMessage> msgs;
private int encoded_length;
MessageBatch(int buffer_size) {
this.buffer_size = buffer_size;
- msgs = new ArrayList<Object>();
+ msgs = new ArrayList<TaskMessage>();
encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
}
- void add(Object obj) {
+ void add(TaskMessage obj) {
if (obj == null)
throw new RuntimeException("null object forbidded in message batch");
- if (obj instanceof TaskMessage) {
- TaskMessage msg = (TaskMessage)obj;
- msgs.add(msg);
- encoded_length += msgEncodeLength(msg);
- return;
- }
-
- if (obj instanceof ControlMessage) {
- ControlMessage msg = (ControlMessage)obj;
- msgs.add(msg);
- encoded_length += msg.encodeLength();
- return;
- }
-
- throw new RuntimeException("Unsuppoted object type "+obj.getClass().getName());
- }
-
- void remove(Object obj) {
- if (obj == null) return;
-
- if (obj instanceof TaskMessage) {
- TaskMessage msg = (TaskMessage)obj;
- msgs.remove(msg);
- encoded_length -= msgEncodeLength(msg);
- return;
- }
-
- if (obj instanceof ControlMessage) {
- ControlMessage msg = (ControlMessage)obj;
- msgs.remove(msg);
- encoded_length -= msg.encodeLength();
- return;
- }
- }
-
- Object get(int index) {
- return msgs.get(index);
+ TaskMessage msg = (TaskMessage)obj;
+ msgs.add(msg);
+ encoded_length += msgEncodeLength(msg);
}
/**
@@ -129,12 +95,9 @@ class MessageBatch {
ChannelBuffer buffer() throws Exception {
ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
- for (Object msg : msgs)
- if (msg instanceof TaskMessage)
- writeTaskMessage(bout, (TaskMessage)msg);
- else
- ((ControlMessage)msg).write(bout);
-
+ for (TaskMessage msg : msgs)
+ writeTaskMessage(bout, msg);
+
//add a END_OF_BATCH indicator
ControlMessage.EOB_MESSAGE.write(bout);
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/94c4d4d9/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index ad811b0..83e4187 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -123,7 +123,7 @@ class Server implements IConnection {
* close all channels, and release resources
*/
public synchronized void close() {
- if (allChannels != null) {
+ if (allChannels != null) {
allChannels.close().awaitUninterruptibly();
factory.releaseExternalResources();
allChannels = null;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/94c4d4d9/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
index 65c36a7..43a8c39 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
@@ -17,7 +17,15 @@
*/
package backtype.storm.messaging.netty;
-import org.jboss.netty.channel.*;
+import java.net.ConnectException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,12 +35,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class StormClientHandler extends SimpleChannelUpstreamHandler {
private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
private Client client;
- private AtomicBoolean being_closed;
- long start_time;
+ long start_time;
StormClientHandler(Client client) {
this.client = client;
- being_closed = new AtomicBoolean(false);
start_time = System.currentTimeMillis();
}
@@ -41,13 +47,14 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
//register the newly established channel
Channel channel = event.getChannel();
client.setChannel(channel);
- LOG.debug("connection established to a remote host");
+ LOG.info("connection established from "+channel.getLocalAddress()+" to "+channel.getRemoteAddress());
- //send next request
+ //send next batch of requests if any
try {
- sendRequests(channel, client.takeMessages());
- } catch (InterruptedException e) {
- channel.close();
+ client.tryDeliverMessages(false);
+ } catch (Exception ex) {
+ LOG.info("exception when sending messages:", ex.getMessage());
+ client.reconnect();
}
}
@@ -60,62 +67,21 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
if (msg==ControlMessage.FAILURE_RESPONSE)
LOG.info("failure response:{}", msg);
- //send next request
- Channel channel = event.getChannel();
+ //send next batch of requests if any
try {
- sendRequests(channel, client.takeMessages());
- } catch (InterruptedException e) {
- channel.close();
- }
- }
-
- /**
- * Retrieve a request from message queue, and send to server
- * @param channel
- */
- private void sendRequests(Channel channel, final MessageBatch requests) {
- if (requests==null || requests.size()==0 || being_closed.get()) return;
-
- //if task==CLOSE_MESSAGE for our last request, the channel is to be closed
- Object last_msg = requests.get(requests.size()-1);
- if (last_msg==ControlMessage.CLOSE_MESSAGE) {
- being_closed.set(true);
- requests.remove(last_msg);
- }
-
- //we may don't need do anything if no requests found
- if (requests.isEmpty()) {
- if (being_closed.get())
- client.close_n_release();
- return;
+ client.tryDeliverMessages(false);
+ } catch (Exception ex) {
+ LOG.info("exception when sending messages:", ex.getMessage());
+ client.reconnect();
}
-
- //write request into socket channel
- ChannelFuture future = channel.write(requests);
- future.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future)
- throws Exception {
- if (!future.isSuccess()) {
- LOG.info("failed to send requests:", future.getCause());
- future.getChannel().close();
- } else {
- LOG.debug("{} request(s) sent", requests.size());
- }
- if (being_closed.get())
- client.close_n_release();
- }
- });
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
Throwable cause = event.getCause();
if (!(cause instanceof ConnectException)) {
- LOG.info("Connection failed:", cause);
- }
- if (!being_closed.get()) {
- client.setChannel(null);
- client.reconnect();
+ LOG.info("Connection to "+client.remote_addr+" failed:", cause);
}
+ client.reconnect();
}
}
[2/3] git commit: Merge branch 'netty-thread-usage' of
github.com:revans2/incubator-storm
Posted by pt...@apache.org.
Merge branch 'netty-thread-usage' of github.com:revans2/incubator-storm
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/3e7abfc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/3e7abfc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/3e7abfc0
Branch: refs/heads/master
Commit: 3e7abfc0b0b260c6ac320ac9eca7f5208f0e4973
Parents: 1be3d0f 94c4d4d
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Apr 11 12:06:28 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Apr 11 12:06:28 2014 -0400
----------------------------------------------------------------------
.../backtype/storm/messaging/netty/Client.java | 184 +++++++++++++------
.../backtype/storm/messaging/netty/Context.java | 30 ++-
.../storm/messaging/netty/MessageBatch.java | 55 +-----
.../backtype/storm/messaging/netty/Server.java | 2 +-
.../messaging/netty/StormClientHandler.java | 80 +++-----
5 files changed, 183 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
[3/3] git commit: STORM-12: update changelog
Posted by pt...@apache.org.
STORM-12: update changelog
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/1a0b46e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/1a0b46e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/1a0b46e9
Branch: refs/heads/master
Commit: 1a0b46e95ab4ac467525314a75819a75dec92c40
Parents: 3e7abfc
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Apr 11 12:11:46 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Apr 11 12:11:46 2014 -0400
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/1a0b46e9/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 764b9f4..1fd0e80 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.9.2-incubating (unreleased)
+ * STORM-12: reduce thread usage of netty transport
* STORM-281: fix and issue with config parsing that could lead to leaking file descriptors
* STORM-196: When JVM_OPTS are set, storm jar fails to detect storm.jar from environment
* STORM-260: Fix a potential race condition with simulated time in Storm's unit tests