You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/08/03 15:22:39 UTC

[1/2] storm git commit: STORM-3168 prevent AsyncLocalizer cleanup from crashing

Repository: storm
Updated Branches:
  refs/heads/master 5ef7c1d35 -> aff667643


STORM-3168 prevent AsyncLocalizer cleanup from crashing


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5c6e102a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5c6e102a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5c6e102a

Branch: refs/heads/master
Commit: 5c6e102aa4e4d5c8f9515fa919f38130872eb18a
Parents: 146beff
Author: Aaron Gresch <ag...@yahoo-inc.com>
Authored: Wed Aug 1 14:53:53 2018 -0500
Committer: Aaron Gresch <ag...@yahoo-inc.com>
Committed: Thu Aug 2 13:45:35 2018 -0500

----------------------------------------------------------------------
 .../apache/storm/localizer/AsyncLocalizer.java  | 99 +++++++++++---------
 1 file changed, 53 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5c6e102a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 19a8cd3..9c2596d 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -567,60 +567,67 @@ public class AsyncLocalizer implements AutoCloseable {
 
     @VisibleForTesting
     void cleanup() {
-        LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize);
-        // need one large set of all and then clean via LRU
-        for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userArchives.entrySet()) {
-            toClean.addResources(t.getValue());
-            LOG.debug("Resources to be cleaned after adding {} archives : {}", t.getKey(), toClean);
-        }
+        try {
+            LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize);
+            // need one large set of all and then clean via LRU
+            for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userArchives.entrySet()) {
+                toClean.addResources(t.getValue());
+                LOG.debug("Resources to be cleaned after adding {} archives : {}", t.getKey(), toClean);
+            }
 
-        for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userFiles.entrySet()) {
-            toClean.addResources(t.getValue());
-            LOG.debug("Resources to be cleaned after adding {} files : {}", t.getKey(), toClean);
-        }
+            for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userFiles.entrySet()) {
+                toClean.addResources(t.getValue());
+                LOG.debug("Resources to be cleaned after adding {} files : {}", t.getKey(), toClean);
+            }
 
-        toClean.addResources(topologyBlobs);
-        try (ClientBlobStore store = getClientBlobStore()) {
-            toClean.cleanup(store);
-        }
+            toClean.addResources(topologyBlobs);
+            try (ClientBlobStore store = getClientBlobStore()) {
+                toClean.cleanup(store);
+            }
 
-        HashSet<String> safeTopologyIds = new HashSet<>();
-        for (String blobKey : topologyBlobs.keySet()) {
-            safeTopologyIds.add(ConfigUtils.getIdFromBlobKey(blobKey));
-        }
+            HashSet<String> safeTopologyIds = new HashSet<>();
+            for (String blobKey : topologyBlobs.keySet()) {
+                safeTopologyIds.add(ConfigUtils.getIdFromBlobKey(blobKey));
+            }
 
-        //Deleting this early does not hurt anything
-        topologyBasicDownloaded.keySet().removeIf(topoId -> !safeTopologyIds.contains(topoId));
-        blobPending.keySet().removeIf(topoId -> !safeTopologyIds.contains(topoId));
+            //Deleting this early does not hurt anything
+            topologyBasicDownloaded.keySet().removeIf(topoId -> !safeTopologyIds.contains(topoId));
+            blobPending.keySet().removeIf(topoId -> !safeTopologyIds.contains(topoId));
 
-        try {
-            forEachTopologyDistDir((p, topologyId) -> {
-                if (!safeTopologyIds.contains(topologyId)) {
-                    fsOps.deleteIfExists(p.toFile());
-                }
-            });
-        } catch (Exception e) {
-            LOG.error("Could not read topology directories for cleanup", e);
-        }
+            try {
+                forEachTopologyDistDir((p, topologyId) -> {
+                    if (!safeTopologyIds.contains(topologyId)) {
+                        fsOps.deleteIfExists(p.toFile());
+                    }
+                });
+            } catch (Exception e) {
+                LOG.error("Could not read topology directories for cleanup", e);
+            }
 
-        LOG.debug("Resource cleanup: {}", toClean);
-        Set<String> allUsers = new HashSet<>(userArchives.keySet());
-        allUsers.addAll(userFiles.keySet());
-        for (String user : allUsers) {
-            ConcurrentMap<String, LocalizedResource> filesForUser = userFiles.get(user);
-            ConcurrentMap<String, LocalizedResource> archivesForUser = userArchives.get(user);
-            if ((filesForUser == null || filesForUser.size() == 0)
-                && (archivesForUser == null || archivesForUser.size() == 0)) {
-
-                LOG.debug("removing empty set: {}", user);
-                try {
-                    LocalizedResource.completelyRemoveUnusedUser(localBaseDir, user);
-                    userFiles.remove(user);
-                    userArchives.remove(user);
-                } catch (IOException e) {
-                    LOG.error("Error trying to delete cached user files", e);
+            LOG.debug("Resource cleanup: {}", toClean);
+            Set<String> allUsers = new HashSet<>(userArchives.keySet());
+            allUsers.addAll(userFiles.keySet());
+            for (String user : allUsers) {
+                ConcurrentMap<String, LocalizedResource> filesForUser = userFiles.get(user);
+                ConcurrentMap<String, LocalizedResource> archivesForUser = userArchives.get(user);
+                if ((filesForUser == null || filesForUser.size() == 0)
+                        && (archivesForUser == null || archivesForUser.size() == 0)) {
+
+                    LOG.debug("removing empty set: {}", user);
+                    try {
+                        LocalizedResource.completelyRemoveUnusedUser(localBaseDir, user);
+                        userFiles.remove(user);
+                        userArchives.remove(user);
+                    } catch (IOException e) {
+                        LOG.error("Error trying to delete cached user files", e);
+                    }
                 }
             }
+        } catch (Exception ex) {
+            LOG.error("AsyncLocalizer cleanup failure", ex);
+        } catch (Error error) {
+            LOG.error("AsyncLocalizer cleanup failure", error);
+            Utils.exitProcess(20, "AsyncLocalizer cleanup failure");
         }
     }
 


[2/2] storm git commit: Merge branch 'agresch_localizercleanup' of https://github.com/agresch/storm into STORM-3168

Posted by bo...@apache.org.
Merge branch 'agresch_localizercleanup' of https://github.com/agresch/storm into STORM-3168

STORM-3168: prevent AsyncLocalizer cleanup from crashing

This closes #2786


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aff66764
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aff66764
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aff66764

Branch: refs/heads/master
Commit: aff667643e220d4f5353a63b9a759d1646366224
Parents: 5ef7c1d 5c6e102
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Aug 3 09:58:05 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Fri Aug 3 09:58:05 2018 -0500

----------------------------------------------------------------------
 .../apache/storm/localizer/AsyncLocalizer.java  | 99 +++++++++++---------
 1 file changed, 53 insertions(+), 46 deletions(-)
----------------------------------------------------------------------