You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/03/13 19:34:05 UTC
[1/5] storm git commit: STORM-625: don't leak netty clients when
workers move
Repository: storm
Updated Branches:
refs/heads/master e77a300c5 -> 87a662730
STORM-625: don't leak netty clients when workers move
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5be9e9d9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5be9e9d9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5be9e9d9
Branch: refs/heads/master
Commit: 5be9e9d9cfe6fb497a7706308cc9c59061215cea
Parents: 2dbeb98
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Mar 3 16:34:53 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Mar 3 16:34:53 2015 -0600
----------------------------------------------------------------------
.../backtype/storm/messaging/netty/Client.java | 6 +++++-
.../backtype/storm/messaging/netty/Context.java | 21 ++++++++++++--------
2 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5be9e9d9/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 7392d3e..dd8a196 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -147,9 +147,11 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
private MessageBatch messageBatch = null;
private final ListeningScheduledExecutorService scheduler;
protected final Map stormConf;
+ private Context context;
@SuppressWarnings("rawtypes")
- Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port) {
+ Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port, Context context) {
+ this.context = context;
closing = false;
this.stormConf = stormConf;
this.scheduler = MoreExecutors.listeningDecorator(scheduler);
@@ -508,6 +510,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
public void close() {
if (!closing) {
LOG.info("closing Netty Client {}", dstAddressPrefixedName);
+ context.removeClient(this);
+ context = null;
// Set closing to true to prevent any further reconnection attempts.
closing = true;
flushPendingMessages();
http://git-wip-us.apache.org/repos/asf/storm/blob/5be9e9d9/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index f592aff..75a2d88 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -25,7 +25,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.Map;
-import java.util.Vector;
+import java.util.Set;
+import java.util.HashSet;
import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
@@ -37,7 +38,7 @@ public class Context implements IContext {
@SuppressWarnings("rawtypes")
private Map storm_conf;
- private volatile Vector<IConnection> connections;
+ private Set<IConnection> connections;
private NioClientSocketChannelFactory clientChannelFactory;
private ScheduledExecutorService clientScheduleService;
@@ -49,7 +50,7 @@ public class Context implements IContext {
@SuppressWarnings("rawtypes")
public void prepare(Map storm_conf) {
this.storm_conf = storm_conf;
- connections = new Vector<IConnection>();
+ connections = new HashSet<IConnection>();
//each context will have a single client channel factory
int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
@@ -71,7 +72,7 @@ public class Context implements IContext {
/**
* establish a server with a binding port
*/
- public IConnection bind(String storm_id, int port) {
+ public synchronized IConnection bind(String storm_id, int port) {
IConnection server = new Server(storm_conf, port);
connections.add(server);
return server;
@@ -80,20 +81,24 @@ public class Context implements IContext {
/**
* establish a connection to a remote server
*/
- public IConnection connect(String storm_id, String host, int port) {
+ public synchronized IConnection connect(String storm_id, String host, int port) {
IConnection client = new Client(storm_conf, clientChannelFactory,
- clientScheduleService, host, port);
+ clientScheduleService, host, port, this);
connections.add(client);
return client;
}
+ synchronized void removeClient(Client c) {
+ connections.remove(c);
+ }
+
/**
* terminate this context
*/
- public void term() {
+ public synchronized void term() {
clientScheduleService.shutdown();
- for (IConnection conn : connections) {
+ for (IConnection conn : new HashSet<IConnection>(connections)) {
conn.close();
}
[5/5] storm git commit: Adding STORM-625 to changelog.
Posted by kn...@apache.org.
Adding STORM-625 to changelog.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/87a66273
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/87a66273
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/87a66273
Branch: refs/heads/master
Commit: 87a6627302343305beaaade1ab8a64aad3b045ef
Parents: 145f542
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Fri Mar 13 13:33:42 2015 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Fri Mar 13 13:33:42 2015 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/87a66273/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index aa2fdc5..1befac1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.10.0
+ * STORM-625: Don't leak netty clients when worker moves or reuse netty client.
* STORM-682: supervisor should handle worker state corruption gracefully.
* STORM-446: Allow superusers to impersonate other users in secure mode.
* STORM-659: return grep matches each on its own line.
[4/5] storm git commit: Merge branch 'storm625' of
https://github.com/kishorvpatil/incubator-storm
Posted by kn...@apache.org.
Merge branch 'storm625' of https://github.com/kishorvpatil/incubator-storm
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/145f5427
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/145f5427
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/145f5427
Branch: refs/heads/master
Commit: 145f542700e761572f7359b33e690bae1b446f9a
Parents: e77a300 b02f612
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Fri Mar 13 12:19:46 2015 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Fri Mar 13 12:19:46 2015 -0500
----------------------------------------------------------------------
.../backtype/storm/messaging/netty/Client.java | 6 +++-
.../backtype/storm/messaging/netty/Context.java | 33 ++++++++++++++------
2 files changed, 28 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
[3/5] storm git commit: Reuse netty client in case worker already has
one.
Posted by kn...@apache.org.
Reuse netty client in case worker already has one.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b02f6124
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b02f6124
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b02f6124
Branch: refs/heads/master
Commit: b02f612498b6310d41c0d5fb15d92300c8179094
Parents: 14bc9ce
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Tue Mar 10 17:14:09 2015 +0000
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Tue Mar 10 17:14:09 2015 +0000
----------------------------------------------------------------------
.../backtype/storm/messaging/netty/Client.java | 2 +-
.../backtype/storm/messaging/netty/Context.java | 26 +++++++++++++-------
2 files changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b02f6124/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index dd8a196..945d986 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -510,7 +510,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
public void close() {
if (!closing) {
LOG.info("closing Netty Client {}", dstAddressPrefixedName);
- context.removeClient(this);
+ context.removeClient(dstAddress.getHostName(),dstAddress.getPort());
context = null;
// Set closing to true to prevent any further reconnection attempts.
closing = true;
http://git-wip-us.apache.org/repos/asf/storm/blob/b02f6124/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 75a2d88..615702d 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -24,9 +24,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
-import java.util.HashSet;
import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
@@ -38,7 +37,7 @@ public class Context implements IContext {
@SuppressWarnings("rawtypes")
private Map storm_conf;
- private Set<IConnection> connections;
+ private Map<String, IConnection> connections;
private NioClientSocketChannelFactory clientChannelFactory;
private ScheduledExecutorService clientScheduleService;
@@ -50,7 +49,7 @@ public class Context implements IContext {
@SuppressWarnings("rawtypes")
public void prepare(Map storm_conf) {
this.storm_conf = storm_conf;
- connections = new HashSet<IConnection>();
+ connections = new HashMap<String, IConnection>();
//each context will have a single client channel factory
int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
@@ -74,7 +73,7 @@ public class Context implements IContext {
*/
public synchronized IConnection bind(String storm_id, int port) {
IConnection server = new Server(storm_conf, port);
- connections.add(server);
+ connections.put(key(storm_id, port), server);
return server;
}
@@ -82,14 +81,19 @@ public class Context implements IContext {
* establish a connection to a remote server
*/
public synchronized IConnection connect(String storm_id, String host, int port) {
+ IConnection connection = connections.get(key(host,port));
+ if(connection !=null)
+ {
+ return connection;
+ }
IConnection client = new Client(storm_conf, clientChannelFactory,
clientScheduleService, host, port, this);
- connections.add(client);
+ connections.put(key(host, port), client);
return client;
}
- synchronized void removeClient(Client c) {
- connections.remove(c);
+ synchronized void removeClient(String host, int port) {
+ connections.remove(key(host, port));
}
/**
@@ -98,7 +102,7 @@ public class Context implements IContext {
public synchronized void term() {
clientScheduleService.shutdown();
- for (IConnection conn : new HashSet<IConnection>(connections)) {
+ for (IConnection conn : connections.values()) {
conn.close();
}
@@ -114,4 +118,8 @@ public class Context implements IContext {
clientChannelFactory.releaseExternalResources();
}
+
+ private String key(String host, int port) {
+ return String.format("%s:%d", host, port);
+ }
}
[2/5] storm git commit: Merge branch 'STORM-625' of
https://github.com/revans2/incubator-storm into storm625
Posted by kn...@apache.org.
Merge branch 'STORM-625' of https://github.com/revans2/incubator-storm into storm625
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/14bc9ce7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/14bc9ce7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/14bc9ce7
Branch: refs/heads/master
Commit: 14bc9ce73bfe7aa4e46c06eefac102ed693e40c8
Parents: 27a3606 5be9e9d
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Tue Mar 10 16:32:21 2015 +0000
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Tue Mar 10 16:32:21 2015 +0000
----------------------------------------------------------------------
.../backtype/storm/messaging/netty/Client.java | 6 +++++-
.../backtype/storm/messaging/netty/Context.java | 21 ++++++++++++--------
2 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------