You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/09/21 18:39:06 UTC

[storm] branch master updated: [STORM-3701] clean-up topo directory with the check against latest topo blob cache (#3336)

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 19e23ae  [STORM-3701] clean-up topo directory with the check against latest topo blob cache (#3336)
19e23ae is described below

commit 19e23ae4a44fcf8e74473a48d408a712cde45d64
Author: Rui Li <ru...@verizonmedia.com>
AuthorDate: Mon Sep 21 13:38:54 2020 -0500

    [STORM-3701] clean-up topo directory with the check against latest topo blob cache (#3336)
---
 .../java/org/apache/storm/localizer/AsyncLocalizer.java  | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index f59943d..054dc62 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -467,14 +467,14 @@ public class AsyncLocalizer implements AutoCloseable {
         // Will need further investigation if the race condition happens again
         List<LocalResource> localResources;
         try {
-            // Precondition1: Base blob stormconf.ser and stormcode.ser have been localized
-            // Precondition2: Both these two blob files are fully downloaded and proper permission been set
+            // Precondition1: Base blob stormconf.ser and stormcode.ser are available
+            // Precondition2: Both files have proper permission
             localResources = getLocalResources(pna);
         } catch (IOException e) {
             LOG.info("Port and assignment info: {}", pna);
             if (e instanceof FileNotFoundException) {
                 localResourceFileNotFoundWhenReleasingSlot.mark();
-                LOG.warn("Local base blobs have not been downloaded yet. ", e);
+                LOG.warn("Local base blobs are not available. ", e);
                 return;
             } else {
                 LOG.error("Unable to read local file. ", e);
@@ -613,6 +613,7 @@ public class AsyncLocalizer implements AutoCloseable {
     @VisibleForTesting
     void cleanup() {
         try {
+            LOG.info("Starting cleanup");
             LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize);
             // need one large set of all and then clean via LRU
             for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userArchives.entrySet()) {
@@ -641,7 +642,12 @@ public class AsyncLocalizer implements AutoCloseable {
 
             try {
                 forEachTopologyDistDir((p, topologyId) -> {
-                    if (!safeTopologyIds.contains(topologyId)) {
+                    String topoJarKey = ConfigUtils.masterStormJarKey(topologyId);
+                    String topoCodeKey = ConfigUtils.masterStormCodeKey(topologyId);
+                    String topoConfKey = ConfigUtils.masterStormConfKey(topologyId);
+                    if (!topologyBlobs.containsKey(topoJarKey)
+                        && !topologyBlobs.containsKey(topoCodeKey)
+                        && !topologyBlobs.containsKey(topoConfKey)) {
                         fsOps.deleteIfExists(p.toFile());
                     }
                 });
@@ -673,6 +679,8 @@ public class AsyncLocalizer implements AutoCloseable {
         } catch (Error error) {
             LOG.error("AsyncLocalizer cleanup failure", error);
             Utils.exitProcess(20, "AsyncLocalizer cleanup failure");
+        } finally {
+            LOG.info("Finish cleanup");
         }
     }