You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/09/21 18:39:06 UTC
[storm] branch master updated: [STORM-3701] clean-up topo directory
with the check against latest topo blob cache (#3336)
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 19e23ae [STORM-3701] clean-up topo directory with the check against latest topo blob cache (#3336)
19e23ae is described below
commit 19e23ae4a44fcf8e74473a48d408a712cde45d64
Author: Rui Li <ru...@verizonmedia.com>
AuthorDate: Mon Sep 21 13:38:54 2020 -0500
[STORM-3701] clean-up topo directory with the check against latest topo blob cache (#3336)
---
.../java/org/apache/storm/localizer/AsyncLocalizer.java | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index f59943d..054dc62 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -467,14 +467,14 @@ public class AsyncLocalizer implements AutoCloseable {
// Will need further investigation if the race condition happens again
List<LocalResource> localResources;
try {
- // Precondition1: Base blob stormconf.ser and stormcode.ser have been localized
- // Precondition2: Both these two blob files are fully downloaded and proper permission been set
+ // Precondition1: Base blob stormconf.ser and stormcode.ser are available
+ // Precondition2: Both files have proper permission
localResources = getLocalResources(pna);
} catch (IOException e) {
LOG.info("Port and assignment info: {}", pna);
if (e instanceof FileNotFoundException) {
localResourceFileNotFoundWhenReleasingSlot.mark();
- LOG.warn("Local base blobs have not been downloaded yet. ", e);
+ LOG.warn("Local base blobs are not available. ", e);
return;
} else {
LOG.error("Unable to read local file. ", e);
@@ -613,6 +613,7 @@ public class AsyncLocalizer implements AutoCloseable {
@VisibleForTesting
void cleanup() {
try {
+ LOG.info("Starting cleanup");
LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize);
// need one large set of all and then clean via LRU
for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userArchives.entrySet()) {
@@ -641,7 +642,12 @@ public class AsyncLocalizer implements AutoCloseable {
try {
forEachTopologyDistDir((p, topologyId) -> {
- if (!safeTopologyIds.contains(topologyId)) {
+ String topoJarKey = ConfigUtils.masterStormJarKey(topologyId);
+ String topoCodeKey = ConfigUtils.masterStormCodeKey(topologyId);
+ String topoConfKey = ConfigUtils.masterStormConfKey(topologyId);
+ if (!topologyBlobs.containsKey(topoJarKey)
+ && !topologyBlobs.containsKey(topoCodeKey)
+ && !topologyBlobs.containsKey(topoConfKey)) {
fsOps.deleteIfExists(p.toFile());
}
});
@@ -673,6 +679,8 @@ public class AsyncLocalizer implements AutoCloseable {
} catch (Error error) {
LOG.error("AsyncLocalizer cleanup failure", error);
Utils.exitProcess(20, "AsyncLocalizer cleanup failure");
+ } finally {
+ LOG.info("Finish cleanup");
}
}