You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/12/18 15:28:50 UTC
[1/2] storm git commit: STORM-3302: Ensures we close sockets to HDFS
Repository: storm
Updated Branches:
refs/heads/master a9c2f3adb -> 339b1e6f3
STORM-3302: Ensures we close sockets to HDFS
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/845787d6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/845787d6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/845787d6
Branch: refs/heads/master
Commit: 845787d671dcdf4475493ea5888f2021b1422ba9
Parents: 730c1a3
Author: Derek Dagit <de...@oath.com>
Authored: Wed Dec 12 10:43:28 2018 -0600
Committer: Derek Dagit <de...@oath.com>
Committed: Wed Dec 12 10:43:28 2018 -0600
----------------------------------------------------------------------
.../org/apache/storm/blobstore/BlobStore.java | 14 ++++++---
.../storm/dependency/DependencyUploader.java | 20 +++++++++---
.../apache/storm/blobstore/BlobStoreUtils.java | 33 ++++++++++++--------
.../storm/nimbus/LeaderListenerCallback.java | 3 +-
4 files changed, 46 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/845787d6/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
index cb2928c..6cf9df9 100644
--- a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
@@ -289,12 +289,16 @@ public abstract class BlobStore implements Shutdownable, AutoCloseable {
out.write(buffer, 0, len);
}
out.close();
- } catch (AuthorizationException | IOException | RuntimeException e) {
- if (out != null) {
- out.cancel();
- }
+ out = null;
} finally {
- in.close();
+ try {
+ if (out != null) {
+ out.cancel();
+ }
+ in.close();
+ } catch (IOException throwaway) {
+ // Ignored
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/845787d6/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java b/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java
index 41c1d86..d8f8c5a 100644
--- a/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java
+++ b/storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java
@@ -154,11 +154,23 @@ public class DependencyUploader {
acls.add(new AccessControl(AccessControlType.OTHER,
BlobStoreAclHandler.READ));
- AtomicOutputStream blob = getBlobStore().createBlob(key, new SettableBlobMeta(acls));
- Files.copy(dependency.toPath(), blob);
- blob.close();
+ AtomicOutputStream blob = null;
+ try {
+ blob = getBlobStore().createBlob(key, new SettableBlobMeta(acls));
+ Files.copy(dependency.toPath(), blob);
+ blob.close();
+ blob = null;
- uploadNew = true;
+ uploadNew = true;
+ } finally {
+ try {
+ if (blob != null) {
+ blob.cancel();
+ }
+ } catch (IOException throwaway) {
+ // Ignore.
+ }
+ }
}
return uploadNew;
http://git-wip-us.apache.org/repos/asf/storm/blob/845787d6/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
index b9f93db..00d833f 100644
--- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -122,7 +122,6 @@ public class BlobStoreUtils {
throws TTransportException {
ReadableBlobMeta rbm;
ClientBlobStore remoteBlobStore;
- InputStreamWithMeta in;
boolean isSuccess = false;
LOG.debug("Download blob NimbusInfos {}", nimbusInfos);
for (NimbusInfo nimbusInfo : nimbusInfos) {
@@ -134,8 +133,9 @@ public class BlobStoreUtils {
rbm = client.getClient().getBlobMeta(key);
remoteBlobStore = new NimbusBlobStore();
remoteBlobStore.setClient(conf, client);
- in = remoteBlobStore.getBlob(key);
- blobStore.createBlob(key, in, rbm.get_settable(), getNimbusSubject());
+ try (InputStreamWithMeta in = remoteBlobStore.getBlob(key)) {
+ blobStore.createBlob(key, in, rbm.get_settable(), getNimbusSubject());
+ }
// if key already exists while creating the blob else update it
Iterator<String> keyIterator = blobStore.listKeys();
while (keyIterator.hasNext()) {
@@ -170,8 +170,7 @@ public class BlobStoreUtils {
public static boolean downloadUpdatedBlob(Map<String, Object> conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos)
throws TTransportException {
ClientBlobStore remoteBlobStore;
- InputStreamWithMeta in;
- AtomicOutputStream out;
+ AtomicOutputStream out = null;
boolean isSuccess = false;
LOG.debug("Download blob NimbusInfos {}", nimbusInfos);
for (NimbusInfo nimbusInfo : nimbusInfos) {
@@ -181,15 +180,15 @@ public class BlobStoreUtils {
try (NimbusClient client = new NimbusClient(conf, nimbusInfo.getHost(), nimbusInfo.getPort(), null)) {
remoteBlobStore = new NimbusBlobStore();
remoteBlobStore.setClient(conf, client);
- in = remoteBlobStore.getBlob(key);
- out = blobStore.updateBlob(key, getNimbusSubject());
- byte[] buffer = new byte[2048];
- int len = 0;
- while ((len = in.read(buffer)) > 0) {
- out.write(buffer, 0, len);
- }
- if (out != null) {
+ try (InputStreamWithMeta in = remoteBlobStore.getBlob(key)) {
+ out = blobStore.updateBlob(key, getNimbusSubject());
+ byte[] buffer = new byte[2048];
+ int len = 0;
+ while ((len = in.read(buffer)) > 0) {
+ out.write(buffer, 0, len);
+ }
out.close();
+ out = null;
}
isSuccess = true;
} catch(FileNotFoundException fnf) {
@@ -204,6 +203,14 @@ public class BlobStoreUtils {
} catch (Exception exp) {
// Logging an exception while client is connecting
LOG.error("Exception", exp);
+ } finally {
+ if (out != null) {
+ try {
+ out.cancel();
+ } catch (IOException e) {
+ // Ignore.
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/845787d6/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
index edd7444..2e1a6ca 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
@@ -196,8 +196,7 @@ public class LeaderListenerCallback {
Subject subject = ReqContext.context().subject();
for (String activeTopologyCodeKey : activeTopologyCodeKeys) {
- try {
- InputStreamWithMeta blob = blobStore.getBlob(activeTopologyCodeKey, subject);
+ try (InputStreamWithMeta blob = blobStore.getBlob(activeTopologyCodeKey, subject)) {
byte[] blobContent = IOUtils.readFully(blob, new Long(blob.getFileLength()).intValue());
StormTopology stormCode = Utils.deserialize(blobContent, StormTopology.class);
if (stormCode.is_set_dependency_jars()) {
[2/2] storm git commit: Merge branch 'storm-3302-fix-socket-leaks' of
github.com:d2r/storm into STORM-3302
Posted by bo...@apache.org.
Merge branch 'storm-3302-fix-socket-leaks' of github.com:d2r/storm into STORM-3302
STORM-3302: Ensures we close streams to HDFS
this closes #2925
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/339b1e6f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/339b1e6f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/339b1e6f
Branch: refs/heads/master
Commit: 339b1e6f34d8209ff8c1afcdea815781bfbe22eb
Parents: a9c2f3a 845787d
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue Dec 18 09:04:29 2018 -0600
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue Dec 18 09:04:29 2018 -0600
----------------------------------------------------------------------
.../org/apache/storm/blobstore/BlobStore.java | 14 ++++++---
.../storm/dependency/DependencyUploader.java | 20 +++++++++---
.../apache/storm/blobstore/BlobStoreUtils.java | 33 ++++++++++++--------
.../storm/nimbus/LeaderListenerCallback.java | 3 +-
4 files changed, 46 insertions(+), 24 deletions(-)
----------------------------------------------------------------------