You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/03/23 16:29:44 UTC
[storm] branch master updated: [STORM-3583] Handle exceptions when
AsyncLocalizer tries to get local resources (#3226)
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new ec217df [STORM-3583] Handle exceptions when AsyncLocalizer tries to get local resources (#3226)
ec217df is described below
commit ec217df0efa11de58fffb01a907fc9c42f255958
Author: Rui Li <ru...@verizonmedia.com>
AuthorDate: Mon Mar 23 11:29:35 2020 -0500
[STORM-3583] Handle exceptions when AsyncLocalizer tries to get local resources (#3226)
---
docs/ClusterMetrics.md | 1 +
.../org/apache/storm/localizer/AsyncLocalizer.java | 35 +++++++++++++++++-----
2 files changed, 29 insertions(+), 7 deletions(-)
diff --git a/docs/ClusterMetrics.md b/docs/ClusterMetrics.md
index f7f7b4f..7a3adea 100644
--- a/docs/ClusterMetrics.md
+++ b/docs/ClusterMetrics.md
@@ -178,6 +178,7 @@ Metrics associated with the supervisor, which launches the workers for a topolog
| supervisor:blob-localization-duration | timer | Approximately how long it takes to get the blob we want after it is requested. |
| supervisor:current-reserved-memory-mb | gauge | total amount of memory reserved for workers on the supervisor (MB) |
| supervisor:current-used-memory-mb | gauge | memory currently used as measured by the supervisor (this typically requires cgroups) (MB) |
+| supervisor:local-resource-file-not-found-when-releasing-slot | meter | number of times file-not-found exception happens when reading local blobs upon releasing slots |
| supervisor:num-blob-update-version-changed | meter | number of times a version of a blob changes. |
| supervisor:num-cleanup-exceptions | meter | exceptions thrown during container cleanup. |
| supervisor:num-force-kill-exceptions | meter | exceptions thrown during force kill. |
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 48ad7e8..f59943d 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -21,6 +21,7 @@ package org.apache.storm.localizer;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
@@ -77,6 +78,7 @@ public class AsyncLocalizer implements AutoCloseable {
private final Timer blobCacheUpdateDuration;
private final Timer blobLocalizationDuration;
private final Meter numBlobUpdateVersionChanged;
+ private final Meter localResourceFileNotFoundWhenReleasingSlot;
// track resources - user to resourceSet
//ConcurrentHashMap is explicitly used everywhere in this class because it uses locks to guarantee atomicity for compute and
@@ -109,6 +111,8 @@ public class AsyncLocalizer implements AutoCloseable {
this.blobCacheUpdateDuration = metricsRegistry.registerTimer("supervisor:blob-cache-update-duration");
this.blobLocalizationDuration = metricsRegistry.registerTimer("supervisor:blob-localization-duration");
this.numBlobUpdateVersionChanged = metricsRegistry.registerMeter("supervisor:num-blob-update-version-changed");
+ this.localResourceFileNotFoundWhenReleasingSlot
+ = metricsRegistry.registerMeter("supervisor:local-resource-file-not-found-when-releasing-slot");
this.metricsRegistry = metricsRegistry;
isLocalMode = ConfigUtils.isLocalMode(conf);
fsOps = ops;
@@ -456,13 +460,31 @@ public class AsyncLocalizer implements AutoCloseable {
topoConfBlob.removeReference(pna);
}
- for (LocalResource lr : getLocalResources(pna)) {
- try {
- removeBlobReference(lr.getBlobName(), pna, lr.shouldUncompress());
- } catch (Exception e) {
- throw new IOException(e);
+
+ // ALERT: A possible race condition should have been resolved
+ // by separating the thread pools into downloadExecService and taskExecService
+ // https://github.com/apache/storm/pull/3153
+ // Will need further investigation if the race condition happens again
+ List<LocalResource> localResources;
+ try {
+ // Precondition1: Base blob stormconf.ser and stormcode.ser have been localized
+ // Precondition2: Both these two blob files are fully downloaded and proper permission been set
+ localResources = getLocalResources(pna);
+ } catch (IOException e) {
+ LOG.info("Port and assignment info: {}", pna);
+ if (e instanceof FileNotFoundException) {
+ localResourceFileNotFoundWhenReleasingSlot.mark();
+ LOG.warn("Local base blobs have not been downloaded yet. ", e);
+ return;
+ } else {
+ LOG.error("Unable to read local file. ", e);
+ throw e;
}
}
+
+ for (LocalResource lr : localResources) {
+ removeBlobReference(lr.getBlobName(), pna, lr.shouldUncompress());
+ }
}
// baseDir/supervisor/usercache/user1/
@@ -509,8 +531,7 @@ public class AsyncLocalizer implements AutoCloseable {
}
// ignores invalid user/topo/key
- void removeBlobReference(String key, PortAndAssignment pna,
- boolean uncompress) throws AuthorizationException, KeyNotFoundException {
+ void removeBlobReference(String key, PortAndAssignment pna, boolean uncompress) {
String user = pna.getOwner();
String topo = pna.getToplogyId();
ConcurrentMap<String, LocalizedResource> lrsrcSet = uncompress ? userArchives.get(user) : userFiles.get(user);