You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/12/18 15:28:50 UTC

[1/2] storm git commit: STORM-3302: Ensures we close sockets to HDFS

Repository: storm
Updated Branches:
  refs/heads/master a9c2f3adb -> 339b1e6f3


STORM-3302: Ensures we close sockets to HDFS


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/845787d6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/845787d6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/845787d6

Branch: refs/heads/master
Commit: 845787d671dcdf4475493ea5888f2021b1422ba9
Parents: 730c1a3
Author: Derek Dagit <de...@oath.com>
Authored: Wed Dec 12 10:43:28 2018 -0600
Committer: Derek Dagit <de...@oath.com>
Committed: Wed Dec 12 10:43:28 2018 -0600

----------------------------------------------------------------------
 .../org/apache/storm/blobstore/BlobStore.java   | 14 ++++++---
 .../storm/dependency/DependencyUploader.java    | 20 +++++++++---
 .../apache/storm/blobstore/BlobStoreUtils.java  | 33 ++++++++++++--------
 .../storm/nimbus/LeaderListenerCallback.java    |  3 +-
 4 files changed, 46 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/845787d6/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
index cb2928c..6cf9df9 100644
--- a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
@@ -289,12 +289,16 @@ public abstract class BlobStore implements Shutdownable, AutoCloseable {
                 out.write(buffer, 0, len);
             }
             out.close();
-        } catch (AuthorizationException | IOException | RuntimeException e) {
-            if (out != null) {
-                out.cancel();
-            }
+            out = null;
         } finally {
-            in.close();
+            try {
+                if (out != null) {
+                    out.cancel();
+                }
+                in.close();
+            } catch (IOException throwaway) {
+                // Ignored
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/845787d6/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java b/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java
index 41c1d86..d8f8c5a 100644
--- a/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java
+++ b/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java
@@ -154,11 +154,23 @@ public class DependencyUploader {
             acls.add(new AccessControl(AccessControlType.OTHER,
                                        BlobStoreAclHandler.READ));
 
-            AtomicOutputStream blob = getBlobStore().createBlob(key, new SettableBlobMeta(acls));
-            Files.copy(dependency.toPath(), blob);
-            blob.close();
+            AtomicOutputStream blob = null;
+            try {
+                blob = getBlobStore().createBlob(key, new SettableBlobMeta(acls));
+                Files.copy(dependency.toPath(), blob);
+                blob.close();
+                blob = null;
 
-            uploadNew = true;
+                uploadNew = true;
+            } finally {
+                try {
+                    if (blob != null) {
+                        blob.cancel();
+                    }
+                } catch (IOException throwaway) {
+                    // Ignore.
+                }
+            }
         }
 
         return uploadNew;

http://git-wip-us.apache.org/repos/asf/storm/blob/845787d6/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
index b9f93db..00d833f 100644
--- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -122,7 +122,6 @@ public class BlobStoreUtils {
         throws TTransportException {
         ReadableBlobMeta rbm;
         ClientBlobStore remoteBlobStore;
-        InputStreamWithMeta in;
         boolean isSuccess = false;
         LOG.debug("Download blob NimbusInfos {}", nimbusInfos);
         for (NimbusInfo nimbusInfo : nimbusInfos) {
@@ -134,8 +133,9 @@ public class BlobStoreUtils {
                 rbm = client.getClient().getBlobMeta(key);
                 remoteBlobStore = new NimbusBlobStore();
                 remoteBlobStore.setClient(conf, client);
-                in = remoteBlobStore.getBlob(key);
-                blobStore.createBlob(key, in, rbm.get_settable(), getNimbusSubject());
+                try (InputStreamWithMeta in = remoteBlobStore.getBlob(key)) {
+                    blobStore.createBlob(key, in, rbm.get_settable(), getNimbusSubject());
+                }
                 // if key already exists while creating the blob else update it
                 Iterator<String> keyIterator = blobStore.listKeys();
                 while (keyIterator.hasNext()) {
@@ -170,8 +170,7 @@ public class BlobStoreUtils {
     public static boolean downloadUpdatedBlob(Map<String, Object> conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos)
         throws TTransportException {
         ClientBlobStore remoteBlobStore;
-        InputStreamWithMeta in;
-        AtomicOutputStream out;
+        AtomicOutputStream out = null;
         boolean isSuccess = false;
         LOG.debug("Download blob NimbusInfos {}", nimbusInfos);
         for (NimbusInfo nimbusInfo : nimbusInfos) {
@@ -181,15 +180,15 @@ public class BlobStoreUtils {
             try (NimbusClient client = new NimbusClient(conf, nimbusInfo.getHost(), nimbusInfo.getPort(), null)) {
                 remoteBlobStore = new NimbusBlobStore();
                 remoteBlobStore.setClient(conf, client);
-                in = remoteBlobStore.getBlob(key);
-                out = blobStore.updateBlob(key, getNimbusSubject());
-                byte[] buffer = new byte[2048];
-                int len = 0;
-                while ((len = in.read(buffer)) > 0) {
-                    out.write(buffer, 0, len);
-                }
-                if (out != null) {
+                try (InputStreamWithMeta in = remoteBlobStore.getBlob(key)) {
+                    out = blobStore.updateBlob(key, getNimbusSubject());
+                    byte[] buffer = new byte[2048];
+                    int len = 0;
+                    while ((len = in.read(buffer)) > 0) {
+                        out.write(buffer, 0, len);
+                    }
                     out.close();
+                    out = null;
                 }
                 isSuccess = true;
             } catch(FileNotFoundException fnf) {
@@ -204,6 +203,14 @@ public class BlobStoreUtils {
             } catch (Exception exp) {
                 // Logging an exception while client is connecting
                 LOG.error("Exception", exp);
+            } finally {
+                if (out != null) {
+                    try {
+                        out.cancel();
+                    } catch (IOException e) {
+                        // Ignore.
+                    }
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/845787d6/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
index edd7444..2e1a6ca 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
@@ -196,8 +196,7 @@ public class LeaderListenerCallback {
         Subject subject = ReqContext.context().subject();
 
         for (String activeTopologyCodeKey : activeTopologyCodeKeys) {
-            try {
-                InputStreamWithMeta blob = blobStore.getBlob(activeTopologyCodeKey, subject);
+            try (InputStreamWithMeta blob = blobStore.getBlob(activeTopologyCodeKey, subject)) {
                 byte[] blobContent = IOUtils.readFully(blob, new Long(blob.getFileLength()).intValue());
                 StormTopology stormCode = Utils.deserialize(blobContent, StormTopology.class);
                 if (stormCode.is_set_dependency_jars()) {


[2/2] storm git commit: Merge branch 'storm-3302-fix-socket-leaks' of github.com:d2r/storm into STORM-3302

Posted by bo...@apache.org.
Merge branch 'storm-3302-fix-socket-leaks' of github.com:d2r/storm into STORM-3302

STORM-3302: Ensures we close streams to HDFS

this closes #2925


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/339b1e6f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/339b1e6f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/339b1e6f

Branch: refs/heads/master
Commit: 339b1e6f34d8209ff8c1afcdea815781bfbe22eb
Parents: a9c2f3a 845787d
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue Dec 18 09:04:29 2018 -0600
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue Dec 18 09:04:29 2018 -0600

----------------------------------------------------------------------
 .../org/apache/storm/blobstore/BlobStore.java   | 14 ++++++---
 .../storm/dependency/DependencyUploader.java    | 20 +++++++++---
 .../apache/storm/blobstore/BlobStoreUtils.java  | 33 ++++++++++++--------
 .../storm/nimbus/LeaderListenerCallback.java    |  3 +-
 4 files changed, 46 insertions(+), 24 deletions(-)
----------------------------------------------------------------------