You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/03/13 19:34:05 UTC

[1/5] storm git commit: STORM-625: don't leak netty clients when workers move

Repository: storm
Updated Branches:
  refs/heads/master e77a300c5 -> 87a662730


STORM-625: don't leak netty clients when workers move


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5be9e9d9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5be9e9d9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5be9e9d9

Branch: refs/heads/master
Commit: 5be9e9d9cfe6fb497a7706308cc9c59061215cea
Parents: 2dbeb98
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Mar 3 16:34:53 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Mar 3 16:34:53 2015 -0600

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  |  6 +++++-
 .../backtype/storm/messaging/netty/Context.java | 21 ++++++++++++--------
 2 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5be9e9d9/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 7392d3e..dd8a196 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -147,9 +147,11 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
     private MessageBatch messageBatch = null;
     private final ListeningScheduledExecutorService scheduler;
     protected final Map stormConf;
+    private Context context;
 
     @SuppressWarnings("rawtypes")
-    Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port) {
+    Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port, Context context) {
+        this.context = context;
         closing = false;
         this.stormConf = stormConf;
         this.scheduler =  MoreExecutors.listeningDecorator(scheduler);
@@ -508,6 +510,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
     public void close() {
         if (!closing) {
             LOG.info("closing Netty Client {}", dstAddressPrefixedName);
+            context.removeClient(this);
+            context = null;
             // Set closing to true to prevent any further reconnection attempts.
             closing = true;
             flushPendingMessages();

http://git-wip-us.apache.org/repos/asf/storm/blob/5be9e9d9/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index f592aff..75a2d88 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -25,7 +25,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.Map;
-import java.util.Vector;
+import java.util.Set;
+import java.util.HashSet;
 
 import backtype.storm.Config;
 import backtype.storm.messaging.IConnection;
@@ -37,7 +38,7 @@ public class Context implements IContext {
         
     @SuppressWarnings("rawtypes")
     private Map storm_conf;
-    private volatile Vector<IConnection> connections;
+    private Set<IConnection> connections;
     private NioClientSocketChannelFactory clientChannelFactory;
     
     private ScheduledExecutorService clientScheduleService;
@@ -49,7 +50,7 @@ public class Context implements IContext {
     @SuppressWarnings("rawtypes")
     public void prepare(Map storm_conf) {
         this.storm_conf = storm_conf;
-        connections = new Vector<IConnection>();
+        connections = new HashSet<IConnection>();
 
         //each context will have a single client channel factory
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
@@ -71,7 +72,7 @@ public class Context implements IContext {
     /**
      * establish a server with a binding port
      */
-    public IConnection bind(String storm_id, int port) {
+    public synchronized IConnection bind(String storm_id, int port) {
         IConnection server = new Server(storm_conf, port);
         connections.add(server);
         return server;
@@ -80,20 +81,24 @@ public class Context implements IContext {
     /**
      * establish a connection to a remote server
      */
-    public IConnection connect(String storm_id, String host, int port) {        
+    public synchronized IConnection connect(String storm_id, String host, int port) {
         IConnection client =  new Client(storm_conf, clientChannelFactory, 
-                clientScheduleService, host, port);
+                clientScheduleService, host, port, this);
         connections.add(client);
         return client;
     }
 
+    synchronized void removeClient(Client c) {
+        connections.remove(c);
+    }
+
     /**
      * terminate this context
      */
-    public void term() {
+    public synchronized void term() {
         clientScheduleService.shutdown();        
         
-        for (IConnection conn : connections) {
+        for (IConnection conn : new HashSet<IConnection>(connections)) {
             conn.close();
         }
         


[5/5] storm git commit: Adding STORM-625 to changelog.

Posted by kn...@apache.org.
Adding STORM-625 to changelog.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/87a66273
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/87a66273
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/87a66273

Branch: refs/heads/master
Commit: 87a6627302343305beaaade1ab8a64aad3b045ef
Parents: 145f542
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Fri Mar 13 13:33:42 2015 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Fri Mar 13 13:33:42 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/87a66273/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index aa2fdc5..1befac1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.10.0
+ * STORM-625: Don't leak netty clients when worker moves or reuse netty client.	
  * STORM-682: supervisor should handle worker state corruption gracefully.
  * STORM-446: Allow superusers to impersonate other users in secure mode.
  * STORM-659: return grep matches each on its own line.


[4/5] storm git commit: Merge branch 'storm625' of https://github.com/kishorvpatil/incubator-storm

Posted by kn...@apache.org.
Merge branch 'storm625' of https://github.com/kishorvpatil/incubator-storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/145f5427
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/145f5427
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/145f5427

Branch: refs/heads/master
Commit: 145f542700e761572f7359b33e690bae1b446f9a
Parents: e77a300 b02f612
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Fri Mar 13 12:19:46 2015 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Fri Mar 13 12:19:46 2015 -0500

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  |  6 +++-
 .../backtype/storm/messaging/netty/Context.java | 33 ++++++++++++++------
 2 files changed, 28 insertions(+), 11 deletions(-)
----------------------------------------------------------------------



[3/5] storm git commit: Reuse netty client in case worker already has one.

Posted by kn...@apache.org.
Reuse netty client in case worker already has one.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b02f6124
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b02f6124
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b02f6124

Branch: refs/heads/master
Commit: b02f612498b6310d41c0d5fb15d92300c8179094
Parents: 14bc9ce
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Tue Mar 10 17:14:09 2015 +0000
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Tue Mar 10 17:14:09 2015 +0000

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  |  2 +-
 .../backtype/storm/messaging/netty/Context.java | 26 +++++++++++++-------
 2 files changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b02f6124/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index dd8a196..945d986 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -510,7 +510,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
     public void close() {
         if (!closing) {
             LOG.info("closing Netty Client {}", dstAddressPrefixedName);
-            context.removeClient(this);
+            context.removeClient(dstAddress.getHostName(),dstAddress.getPort());
             context = null;
             // Set closing to true to prevent any further reconnection attempts.
             closing = true;

http://git-wip-us.apache.org/repos/asf/storm/blob/b02f6124/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 75a2d88..615702d 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -24,9 +24,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
-import java.util.HashSet;
 
 import backtype.storm.Config;
 import backtype.storm.messaging.IConnection;
@@ -38,7 +37,7 @@ public class Context implements IContext {
         
     @SuppressWarnings("rawtypes")
     private Map storm_conf;
-    private Set<IConnection> connections;
+    private Map<String, IConnection> connections;
     private NioClientSocketChannelFactory clientChannelFactory;
     
     private ScheduledExecutorService clientScheduleService;
@@ -50,7 +49,7 @@ public class Context implements IContext {
     @SuppressWarnings("rawtypes")
     public void prepare(Map storm_conf) {
         this.storm_conf = storm_conf;
-        connections = new HashSet<IConnection>();
+        connections = new HashMap<String, IConnection>();
 
         //each context will have a single client channel factory
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
@@ -74,7 +73,7 @@ public class Context implements IContext {
      */
     public synchronized IConnection bind(String storm_id, int port) {
         IConnection server = new Server(storm_conf, port);
-        connections.add(server);
+        connections.put(key(storm_id, port), server);
         return server;
     }
 
@@ -82,14 +81,19 @@ public class Context implements IContext {
      * establish a connection to a remote server
      */
     public synchronized IConnection connect(String storm_id, String host, int port) {
+        IConnection connection = connections.get(key(host,port));
+        if(connection !=null)
+        {
+            return connection;
+        }
         IConnection client =  new Client(storm_conf, clientChannelFactory, 
                 clientScheduleService, host, port, this);
-        connections.add(client);
+        connections.put(key(host, port), client);
         return client;
     }
 
-    synchronized void removeClient(Client c) {
-        connections.remove(c);
+    synchronized void removeClient(String host, int port) {
+        connections.remove(key(host, port));
     }
 
     /**
@@ -98,7 +102,7 @@ public class Context implements IContext {
     public synchronized void term() {
         clientScheduleService.shutdown();        
         
-        for (IConnection conn : new HashSet<IConnection>(connections)) {
+        for (IConnection conn : connections.values()) {
             conn.close();
         }
         
@@ -114,4 +118,8 @@ public class Context implements IContext {
         clientChannelFactory.releaseExternalResources();
 
     }
+
+    private String key(String host, int port) {
+        return String.format("%s:%d", host, port);
+    }
 }


[2/5] storm git commit: Merge branch 'STORM-625' of https://github.com/revans2/incubator-storm into storm625

Posted by kn...@apache.org.
Merge branch 'STORM-625' of https://github.com/revans2/incubator-storm into storm625


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/14bc9ce7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/14bc9ce7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/14bc9ce7

Branch: refs/heads/master
Commit: 14bc9ce73bfe7aa4e46c06eefac102ed693e40c8
Parents: 27a3606 5be9e9d
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Tue Mar 10 16:32:21 2015 +0000
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Tue Mar 10 16:32:21 2015 +0000

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  |  6 +++++-
 .../backtype/storm/messaging/netty/Context.java | 21 ++++++++++++--------
 2 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------