You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/03/23 16:29:44 UTC

[storm] branch master updated: [STORM-3583] Handle exceptions when AsyncLocalizer tries to get local resources (#3226)

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new ec217df  [STORM-3583] Handle exceptions when AsyncLocalizer tries to get local resources (#3226)
ec217df is described below

commit ec217df0efa11de58fffb01a907fc9c42f255958
Author: Rui Li <ru...@verizonmedia.com>
AuthorDate: Mon Mar 23 11:29:35 2020 -0500

    [STORM-3583] Handle exceptions when AsyncLocalizer tries to get local resources (#3226)
---
 docs/ClusterMetrics.md                             |  1 +
 .../org/apache/storm/localizer/AsyncLocalizer.java | 35 +++++++++++++++++-----
 2 files changed, 29 insertions(+), 7 deletions(-)

diff --git a/docs/ClusterMetrics.md b/docs/ClusterMetrics.md
index f7f7b4f..7a3adea 100644
--- a/docs/ClusterMetrics.md
+++ b/docs/ClusterMetrics.md
@@ -178,6 +178,7 @@ Metrics associated with the supervisor, which launches the workers for a topolog
 | supervisor:blob-localization-duration | timer | Approximately how long it takes to get the blob we want after it is requested. |
 | supervisor:current-reserved-memory-mb | gauge | total amount of memory reserved for workers on the supervisor (MB) |
 | supervisor:current-used-memory-mb | gauge | memory currently used as measured by the supervisor (this typically requires cgroups) (MB) |
+| supervisor:local-resource-file-not-found-when-releasing-slot | meter | number of times file-not-found exception happens when reading local blobs upon releasing slots |
 | supervisor:num-blob-update-version-changed | meter | number of times a version of a blob changes. |
 | supervisor:num-cleanup-exceptions | meter | exceptions thrown during container cleanup. |
 | supervisor:num-force-kill-exceptions | meter | exceptions thrown during force kill. |
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 48ad7e8..f59943d 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -21,6 +21,7 @@ package org.apache.storm.localizer;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
@@ -77,6 +78,7 @@ public class AsyncLocalizer implements AutoCloseable {
     private final Timer blobCacheUpdateDuration;
     private final Timer blobLocalizationDuration;
     private final Meter numBlobUpdateVersionChanged;
+    private final Meter localResourceFileNotFoundWhenReleasingSlot;
 
     // track resources - user to resourceSet
     //ConcurrentHashMap is explicitly used everywhere in this class because it uses locks to guarantee atomicity for compute and
@@ -109,6 +111,8 @@ public class AsyncLocalizer implements AutoCloseable {
         this.blobCacheUpdateDuration = metricsRegistry.registerTimer("supervisor:blob-cache-update-duration");
         this.blobLocalizationDuration = metricsRegistry.registerTimer("supervisor:blob-localization-duration");
         this.numBlobUpdateVersionChanged = metricsRegistry.registerMeter("supervisor:num-blob-update-version-changed");
+        this.localResourceFileNotFoundWhenReleasingSlot
+                = metricsRegistry.registerMeter("supervisor:local-resource-file-not-found-when-releasing-slot");
         this.metricsRegistry = metricsRegistry;
         isLocalMode = ConfigUtils.isLocalMode(conf);
         fsOps = ops;
@@ -456,13 +460,31 @@ public class AsyncLocalizer implements AutoCloseable {
             topoConfBlob.removeReference(pna);
         }
 
-        for (LocalResource lr : getLocalResources(pna)) {
-            try {
-                removeBlobReference(lr.getBlobName(), pna, lr.shouldUncompress());
-            } catch (Exception e) {
-                throw new IOException(e);
+
+        // ALERT: A possible race condition should have been resolved
+        // by separating the thread pools into downloadExecService and taskExecService
+        // https://github.com/apache/storm/pull/3153
+        // Will need further investigation if the race condition happens again
+        List<LocalResource> localResources;
+        try {
+            // Precondition1: Base blob stormconf.ser and stormcode.ser have been localized
+            // Precondition2: Both these two blob files are fully downloaded and proper permission been set
+            localResources = getLocalResources(pna);
+        } catch (IOException e) {
+            LOG.info("Port and assignment info: {}", pna);
+            if (e instanceof FileNotFoundException) {
+                localResourceFileNotFoundWhenReleasingSlot.mark();
+                LOG.warn("Local base blobs have not been downloaded yet. ", e);
+                return;
+            } else {
+                LOG.error("Unable to read local file. ", e);
+                throw e;
             }
         }
+
+        for (LocalResource lr : localResources) {
+            removeBlobReference(lr.getBlobName(), pna, lr.shouldUncompress());
+        }
     }
 
     // baseDir/supervisor/usercache/user1/
@@ -509,8 +531,7 @@ public class AsyncLocalizer implements AutoCloseable {
     }
 
     // ignores invalid user/topo/key
-    void removeBlobReference(String key, PortAndAssignment pna,
-                             boolean uncompress) throws AuthorizationException, KeyNotFoundException {
+    void removeBlobReference(String key, PortAndAssignment pna, boolean uncompress) {
         String user = pna.getOwner();
         String topo = pna.getToplogyId();
         ConcurrentMap<String, LocalizedResource> lrsrcSet = uncompress ? userArchives.get(user) : userFiles.get(user);