You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/08/03 15:22:39 UTC
[1/2] storm git commit: STORM-3168 prevent AsyncLocalizer cleanup
from crashing
Repository: storm
Updated Branches:
refs/heads/master 5ef7c1d35 -> aff667643
STORM-3168 prevent AsyncLocalizer cleanup from crashing
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5c6e102a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5c6e102a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5c6e102a
Branch: refs/heads/master
Commit: 5c6e102aa4e4d5c8f9515fa919f38130872eb18a
Parents: 146beff
Author: Aaron Gresch <ag...@yahoo-inc.com>
Authored: Wed Aug 1 14:53:53 2018 -0500
Committer: Aaron Gresch <ag...@yahoo-inc.com>
Committed: Thu Aug 2 13:45:35 2018 -0500
----------------------------------------------------------------------
.../apache/storm/localizer/AsyncLocalizer.java | 99 +++++++++++---------
1 file changed, 53 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5c6e102a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 19a8cd3..9c2596d 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -567,60 +567,67 @@ public class AsyncLocalizer implements AutoCloseable {
@VisibleForTesting
void cleanup() {
- LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize);
- // need one large set of all and then clean via LRU
- for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userArchives.entrySet()) {
- toClean.addResources(t.getValue());
- LOG.debug("Resources to be cleaned after adding {} archives : {}", t.getKey(), toClean);
- }
+ try {
+ LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize);
+ // need one large set of all and then clean via LRU
+ for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userArchives.entrySet()) {
+ toClean.addResources(t.getValue());
+ LOG.debug("Resources to be cleaned after adding {} archives : {}", t.getKey(), toClean);
+ }
- for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userFiles.entrySet()) {
- toClean.addResources(t.getValue());
- LOG.debug("Resources to be cleaned after adding {} files : {}", t.getKey(), toClean);
- }
+ for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userFiles.entrySet()) {
+ toClean.addResources(t.getValue());
+ LOG.debug("Resources to be cleaned after adding {} files : {}", t.getKey(), toClean);
+ }
- toClean.addResources(topologyBlobs);
- try (ClientBlobStore store = getClientBlobStore()) {
- toClean.cleanup(store);
- }
+ toClean.addResources(topologyBlobs);
+ try (ClientBlobStore store = getClientBlobStore()) {
+ toClean.cleanup(store);
+ }
- HashSet<String> safeTopologyIds = new HashSet<>();
- for (String blobKey : topologyBlobs.keySet()) {
- safeTopologyIds.add(ConfigUtils.getIdFromBlobKey(blobKey));
- }
+ HashSet<String> safeTopologyIds = new HashSet<>();
+ for (String blobKey : topologyBlobs.keySet()) {
+ safeTopologyIds.add(ConfigUtils.getIdFromBlobKey(blobKey));
+ }
- //Deleting this early does not hurt anything
- topologyBasicDownloaded.keySet().removeIf(topoId -> !safeTopologyIds.contains(topoId));
- blobPending.keySet().removeIf(topoId -> !safeTopologyIds.contains(topoId));
+ //Deleting this early does not hurt anything
+ topologyBasicDownloaded.keySet().removeIf(topoId -> !safeTopologyIds.contains(topoId));
+ blobPending.keySet().removeIf(topoId -> !safeTopologyIds.contains(topoId));
- try {
- forEachTopologyDistDir((p, topologyId) -> {
- if (!safeTopologyIds.contains(topologyId)) {
- fsOps.deleteIfExists(p.toFile());
- }
- });
- } catch (Exception e) {
- LOG.error("Could not read topology directories for cleanup", e);
- }
+ try {
+ forEachTopologyDistDir((p, topologyId) -> {
+ if (!safeTopologyIds.contains(topologyId)) {
+ fsOps.deleteIfExists(p.toFile());
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Could not read topology directories for cleanup", e);
+ }
- LOG.debug("Resource cleanup: {}", toClean);
- Set<String> allUsers = new HashSet<>(userArchives.keySet());
- allUsers.addAll(userFiles.keySet());
- for (String user : allUsers) {
- ConcurrentMap<String, LocalizedResource> filesForUser = userFiles.get(user);
- ConcurrentMap<String, LocalizedResource> archivesForUser = userArchives.get(user);
- if ((filesForUser == null || filesForUser.size() == 0)
- && (archivesForUser == null || archivesForUser.size() == 0)) {
-
- LOG.debug("removing empty set: {}", user);
- try {
- LocalizedResource.completelyRemoveUnusedUser(localBaseDir, user);
- userFiles.remove(user);
- userArchives.remove(user);
- } catch (IOException e) {
- LOG.error("Error trying to delete cached user files", e);
+ LOG.debug("Resource cleanup: {}", toClean);
+ Set<String> allUsers = new HashSet<>(userArchives.keySet());
+ allUsers.addAll(userFiles.keySet());
+ for (String user : allUsers) {
+ ConcurrentMap<String, LocalizedResource> filesForUser = userFiles.get(user);
+ ConcurrentMap<String, LocalizedResource> archivesForUser = userArchives.get(user);
+ if ((filesForUser == null || filesForUser.size() == 0)
+ && (archivesForUser == null || archivesForUser.size() == 0)) {
+
+ LOG.debug("removing empty set: {}", user);
+ try {
+ LocalizedResource.completelyRemoveUnusedUser(localBaseDir, user);
+ userFiles.remove(user);
+ userArchives.remove(user);
+ } catch (IOException e) {
+ LOG.error("Error trying to delete cached user files", e);
+ }
}
}
+ } catch (Exception ex) {
+ LOG.error("AsyncLocalizer cleanup failure", ex);
+ } catch (Error error) {
+ LOG.error("AsyncLocalizer cleanup failure", error);
+ Utils.exitProcess(20, "AsyncLocalizer cleanup failure");
}
}
[2/2] storm git commit: Merge branch 'agresch_localizercleanup' of
https://github.com/agresch/storm into STORM-3168
Posted by bo...@apache.org.
Merge branch 'agresch_localizercleanup' of https://github.com/agresch/storm into STORM-3168
STORM-3168: prevent AsyncLocalizer cleanup from crashing
This closes #2786
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aff66764
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aff66764
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aff66764
Branch: refs/heads/master
Commit: aff667643e220d4f5353a63b9a759d1646366224
Parents: 5ef7c1d 5c6e102
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Aug 3 09:58:05 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Fri Aug 3 09:58:05 2018 -0500
----------------------------------------------------------------------
.../apache/storm/localizer/AsyncLocalizer.java | 99 +++++++++++---------
1 file changed, 53 insertions(+), 46 deletions(-)
----------------------------------------------------------------------