You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/08/18 19:13:24 UTC

[1/3] storm git commit: [STORM-2042] Nimbus client connections not closed properly causing connection leaks

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 560746f8e -> 78da384c0


[STORM-2042] Nimbus client connections not closed properly causing connection leaks

Implement AutoCloseable and close the nimbus client connections properly.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/495c7056
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/495c7056
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/495c7056

Branch: refs/heads/1.x-branch
Commit: 495c70563cb72241bdebf7e70429c38e60484443
Parents: e55684b
Author: Arun Mahadevan <ar...@apache.org>
Authored: Wed Aug 17 14:02:44 2016 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Wed Aug 17 14:02:44 2016 +0530

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/StormSubmitter.java       | 13 +++----------
 .../org/apache/storm/blobstore/NimbusBlobStore.java    |  3 +++
 .../org/apache/storm/security/auth/ThriftClient.java   |  3 ++-
 .../src/jvm/org/apache/storm/utils/NimbusClient.java   |  3 +--
 storm-core/src/jvm/org/apache/storm/utils/Utils.java   |  5 +----
 5 files changed, 10 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/495c7056/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
index eb7d33d..2232845 100644
--- a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
@@ -234,9 +234,8 @@ public class StormSubmitter {
                     throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
                 }
                 String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, asUser);
-                try {
+                try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)){
                     LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
-                    NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser);
                     if (opts != null) {
                         client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);
                     } else {
@@ -368,8 +367,7 @@ public class StormSubmitter {
     }
 
     private static boolean topologyNameExists(Map conf, String name, String asUser) {
-        NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser);
-        try {
+        try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
             ClusterSummary summary = client.getClient().getClusterInfo();
             for(TopologySummary s : summary.get_topologies()) {
                 if(s.get_name().equals(name)) {
@@ -380,8 +378,6 @@ public class StormSubmitter {
 
         } catch(Exception e) {
             throw new RuntimeException(e);
-        } finally {
-            client.close();
         }
     }
 
@@ -405,8 +401,7 @@ public class StormSubmitter {
             throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
         }
 
-        NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser);
-        try {
+        try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
             String uploadLocation = client.getClient().beginFileUpload();
             LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
             BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES);
@@ -437,8 +432,6 @@ public class StormSubmitter {
             return uploadLocation;
         } catch(Exception e) {
             throw new RuntimeException(e);
-        } finally {
-            client.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/495c7056/storm-core/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
index 76a4c1e..009a07c 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
@@ -398,6 +398,9 @@ public class NimbusBlobStore extends ClientBlobStore {
 
     @Override
     public boolean setClient(Map conf, NimbusClient client) {
+        if (this.client != null) {
+            this.client.close();
+        }
         this.client = client;
         if (conf != null) {
             this.bufferSize = Utils.getInt(conf.get(Config.STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES), bufferSize);

http://git-wip-us.apache.org/repos/asf/storm/blob/495c7056/storm-core/src/jvm/org/apache/storm/security/auth/ThriftClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/ThriftClient.java b/storm-core/src/jvm/org/apache/storm/security/auth/ThriftClient.java
index 0a8e515..bcc1a11 100644
--- a/storm-core/src/jvm/org/apache/storm/security/auth/ThriftClient.java
+++ b/storm-core/src/jvm/org/apache/storm/security/auth/ThriftClient.java
@@ -27,7 +27,7 @@ import org.apache.thrift.transport.TTransport;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.Config;
 
-public class ThriftClient {
+public class ThriftClient implements AutoCloseable {
     private TTransport _transport;
     protected TProtocol _protocol;
     private String _host;
@@ -106,6 +106,7 @@ public class ThriftClient {
         }
     }
 
+    @Override
     public synchronized void close() {
         if (_transport != null) {
             _transport.close();

http://git-wip-us.apache.org/repos/asf/storm/blob/495c7056/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
index af9aebd..456b459 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
@@ -62,8 +62,7 @@ public class NimbusClient extends ThriftClient {
         for (String host : seeds) {
             int port = Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT).toString());
             ClusterSummary clusterInfo;
-            try {
-                NimbusClient client = new NimbusClient(conf, host, port, null, asUser);
+            try (NimbusClient client = new NimbusClient(conf, host, port, null, asUser)) {
                 clusterInfo = client.getClient().getClusterInfo();
             } catch (Exception e) {
                 LOG.warn("Ignoring exception while trying to get leader nimbus info from " + host

http://git-wip-us.apache.org/repos/asf/storm/blob/495c7056/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 8575bd6..7a322ec 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -1387,9 +1387,8 @@ public class Utils {
     }
 
     public static TopologyInfo getTopologyInfo(String name, String asUser, Map stormConf) {
-        NimbusClient client = NimbusClient.getConfiguredClientAs(stormConf, asUser);
         TopologyInfo topologyInfo = null;
-        try {
+        try (NimbusClient client = NimbusClient.getConfiguredClientAs(stormConf, asUser)) {
             ClusterSummary summary = client.getClient().getClusterInfo();
             for(TopologySummary s : summary.get_topologies()) {
                 if(s.get_name().equals(name)) {
@@ -1398,8 +1397,6 @@ public class Utils {
             }
         } catch(Exception e) {
             throw new RuntimeException(e);
-        } finally {
-            client.close();
         }
         return topologyInfo;
     }


[2/3] storm git commit: Merge branch 'STORM-2042' of https://github.com/arunmahadevan/storm into 1.x-branch

Posted by sr...@apache.org.
Merge branch 'STORM-2042' of https://github.com/arunmahadevan/storm into 1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/12b70dd8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/12b70dd8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/12b70dd8

Branch: refs/heads/1.x-branch
Commit: 12b70dd86165f11e2d8a15642327e47e833b7dbe
Parents: 560746f 495c705
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Thu Aug 18 11:24:29 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Thu Aug 18 11:24:29 2016 -0700

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/StormSubmitter.java       | 13 +++----------
 .../org/apache/storm/blobstore/NimbusBlobStore.java    |  3 +++
 .../org/apache/storm/security/auth/ThriftClient.java   |  3 ++-
 .../src/jvm/org/apache/storm/utils/NimbusClient.java   |  3 +--
 storm-core/src/jvm/org/apache/storm/utils/Utils.java   |  5 +----
 5 files changed, 10 insertions(+), 17 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: Added STORM-2042 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-2042 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/78da384c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/78da384c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/78da384c

Branch: refs/heads/1.x-branch
Commit: 78da384c029202067f3770fef66ab1a991934dc2
Parents: 12b70dd
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Thu Aug 18 11:25:48 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Thu Aug 18 11:25:48 2016 -0700

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/78da384c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index dca6f9d..066dc3a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.0
+ * STORM-2042: Nimbus client connections not closed properly causing connection leaks
  * STORM-1766: A better algorithm server rack selection for RAS
  * STORM-1913: Additions and Improvements for Trident RAS API
  * STORM-2037: debug operation should be whitelisted in SimpleAclAuthorizer.