You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2021/01/14 15:15:30 UTC

[storm] branch master updated: STORM-3724 use blobstore modtime to prevent querying each remote file on update (#3363)

This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new af0b1fd  STORM-3724 use blobstore modtime to prevent querying each remote file on update (#3363)
af0b1fd is described below

commit af0b1fd873a4c28253811d9bc27e32c4279daddf
Author: agresch <ag...@gmail.com>
AuthorDate: Thu Jan 14 09:15:12 2021 -0600

    STORM-3724 use blobstore modtime to prevent querying each remote file on update (#3363)
    
    * STORM-3724 use blobstore modtime to prevent querying each blob remote file on update
---
 .../apache/storm/hdfs/blobstore/HdfsBlobStore.java |  14 +++
 .../storm/hdfs/blobstore/HdfsBlobStoreImpl.java    |  75 +++++++++++++++
 .../storm/hdfs/blobstore/HdfsClientBlobStore.java  |   6 ++
 .../jvm/org/apache/storm/blobstore/BlobStore.java  |  18 ++++
 .../apache/storm/blobstore/ClientBlobStore.java    |  10 +-
 .../storm/blobstore/LocalModeClientBlobStore.java  |   6 ++
 .../apache/storm/blobstore/NimbusBlobStore.java    |   5 +
 .../storm/blobstore/ClientBlobStoreTest.java       |   6 ++
 .../org/apache/storm/daemon/nimbus/Nimbus.java     |  19 +++-
 .../org/apache/storm/localizer/AsyncLocalizer.java |  48 ++++------
 .../apache/storm/localizer/LocallyCachedBlob.java  |  92 ++++++++++++++++++-
 .../org/apache/storm/utils/ServerConfigUtils.java  |   5 +
 .../apache/storm/localizer/AsyncLocalizerTest.java |  16 +---
 .../storm/localizer/LocallyCachedBlobTest.java     | 102 +++++++++++++++++++++
 14 files changed, 377 insertions(+), 45 deletions(-)

diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
index 74ec2ef..7d95718 100644
--- a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
+++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
@@ -404,4 +404,18 @@ public class HdfsBlobStore extends BlobStore {
     public void fullCleanup(long age) throws IOException {
         hbs.fullCleanup(age);
     }
+
+    public long getLastBlobUpdateTime() throws IOException {
+        return hbs.getLastBlobUpdateTime();
+    }
+
+    @Override
+    public void updateLastBlobUpdateTime() throws IOException {
+        hbs.updateLastBlobUpdateTime();
+    }
+
+    @Override
+    public void validateBlobUpdateTime() throws IOException {
+        hbs.validateBlobUpdateTime();
+    }
 }
diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
index e197c39..7f81c94 100644
--- a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
+++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
@@ -18,15 +18,22 @@
 
 package org.apache.storm.hdfs.blobstore;
 
+import java.io.BufferedWriter;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Timer;
 import java.util.TimerTask;
+
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -34,6 +41,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.storm.Config;
 import org.apache.storm.blobstore.BlobStoreFile;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +54,7 @@ public class HdfsBlobStoreImpl {
     // blobstore directory is private!
     public static final FsPermission BLOBSTORE_DIR_PERMISSION =
             FsPermission.createImmutable((short) 0700); // rwx--------
+    private static final String BLOBSTORE_UPDATE_TIME_FILE = "lastUpdatedBlobTime";
 
     private static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreImpl.class);
 
@@ -312,4 +321,70 @@ public class HdfsBlobStoreImpl {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last update time of any blob.
+     *
+     * @return the last updated time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastBlobUpdateTime() throws IOException {
+        Path updateTimeFile = new Path(fullPath, BLOBSTORE_UPDATE_TIME_FILE);
+        if (!fileSystem.exists(updateTimeFile)) {
+            return -1L;
+        }
+        FSDataInputStream inputStream = fileSystem.open(updateTimeFile);
+        String timestamp = IOUtils.toString(inputStream, "UTF-8");
+        inputStream.close();
+        try {
+            long updateTime = Long.parseLong(timestamp);
+            return updateTime;
+        } catch (NumberFormatException e) {
+            LOG.error("Invalid blobstore update time {} in file {}", timestamp, updateTimeFile);
+            return -1L;
+        }
+    }
+
+    /**
+     * Updates the last updated time of existing blobstores to the current time.
+     *
+     * @throws IOException on any error
+     */
+    public synchronized void updateLastBlobUpdateTime() throws IOException {
+        Long timestamp = Time.currentTimeMillis();
+        Path updateTimeFile = new Path(fullPath, BLOBSTORE_UPDATE_TIME_FILE);
+        FSDataOutputStream fsDataOutputStream = fileSystem.create(updateTimeFile, true);
+        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
+        bufferedWriter.write(timestamp.toString());
+        bufferedWriter.close();
+        LOG.debug("Updated blobstore update time of {} to {}", updateTimeFile, timestamp);
+    }
+
+    /**
+     * Validates that the last updated blob time of the blobstore is up to date with the current existing blobs.
+     *
+     * @throws IOException on any error
+     */
+    public void validateBlobUpdateTime() throws IOException {
+        int currentBucket = 0;
+        long baseModTime = 0;
+        while (currentBucket < BUCKETS) {
+            String name = String.valueOf(currentBucket);
+            Path bucketDir = new Path(fullPath, name);
+
+            // only consider bucket dirs that exist with files in them
+            if (fileSystem.exists(bucketDir) && fileSystem.listStatus(bucketDir).length > 0) {
+                long modtime = fileSystem.getFileStatus(bucketDir).getModificationTime();
+                if (modtime > baseModTime) {
+                    baseModTime = modtime;
+                }
+            }
+
+            currentBucket++;
+        }
+        if (baseModTime > 0 && baseModTime > getLastBlobUpdateTime()) {
+            LOG.info("Blobstore update time requires an update to at least {}", baseModTime);
+            updateLastBlobUpdateTime();
+        }
+    }
 }
diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
index 65861eb..fbdc186 100644
--- a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
+++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
@@ -18,6 +18,7 @@
 
 package org.apache.storm.hdfs.blobstore;
 
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -133,4 +134,9 @@ public class HdfsClientBlobStore extends ClientBlobStore {
             client = null;
         }
     }
+
+    @Override
+    public long getRemoteBlobstoreUpdateTime() throws IOException {
+        return blobStore.getLastBlobUpdateTime();
+    }
 }
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
index c9219c5..5f41584 100644
--- a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
@@ -319,6 +319,24 @@ public abstract class BlobStore implements Shutdownable, AutoCloseable {
     }
 
     /**
+     * Updates the last update time of existing blobs in the blobstore to the current time.
+     *
+     * @throws IOException on any error
+     */
+    public void updateLastBlobUpdateTime() throws IOException {
+        // default implementation is a NOOP.
+    }
+
+    /**
+     * Validates that the blob update time of the blobstore is up to date with the current existing blobs.
+     *
+     * @throws IOException on any error
+     */
+    public void validateBlobUpdateTime() throws IOException {
+        // default implementation is a NOOP.
+    }
+
+    /**
      * Blob store implements its own version of iterator to list the blobs.
      */
     public static class KeyTranslationIterator implements Iterator<String> {
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
index a83146d..02b284f 100644
--- a/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
@@ -12,6 +12,7 @@
 
 package org.apache.storm.blobstore;
 
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 import org.apache.storm.daemon.Shutdownable;
@@ -182,5 +183,12 @@ public abstract class ClientBlobStore implements Shutdownable, AutoCloseable {
         void run(ClientBlobStore blobStore) throws Exception;
     }
 
-
+    /**
+     * Client facing API to get the last update time of existing blobs in a blobstore.  This is only required for use on
+     * supervisors.
+     *
+     * @return the timestamp of when the blobstore was last updated.  -1L if the blobstore
+     *     does not support this.
+     */
+    public abstract long getRemoteBlobstoreUpdateTime() throws IOException;
 }
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/LocalModeClientBlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/LocalModeClientBlobStore.java
index 70ed6c2..776371f 100644
--- a/storm-client/src/jvm/org/apache/storm/blobstore/LocalModeClientBlobStore.java
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/LocalModeClientBlobStore.java
@@ -12,6 +12,7 @@
 
 package org.apache.storm.blobstore;
 
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 import org.apache.storm.generated.AuthorizationException;
@@ -123,4 +124,9 @@ public class LocalModeClientBlobStore extends ClientBlobStore {
     public void close() {
         wrapped.shutdown();
     }
+
+    @Override
+    public long getRemoteBlobstoreUpdateTime() throws IOException {
+        return -1L; // not supported
+    }
 }
\ No newline at end of file
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
index 9a8f034..09be089 100644
--- a/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
@@ -217,6 +217,11 @@ public class NimbusBlobStore extends ClientBlobStore implements AutoCloseable {
         shutdown();
     }
 
+    @Override
+    public long getRemoteBlobstoreUpdateTime() throws IOException {
+        return -1L; // not supported
+    }
+
     public class NimbusKeyIterator implements Iterator<String> {
         private ListBlobsResult listBlobs = null;
         private int offset = 0;
diff --git a/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java b/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java
index 65a37df..8bbb015 100644
--- a/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java
+++ b/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java
@@ -12,6 +12,7 @@
 
 package org.apache.storm.blobstore;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -185,5 +186,10 @@ public class ClientBlobStoreTest {
         @Override
         public void createStateInZookeeper(String key) {
         }
+
+        @Override
+        public long getRemoteBlobstoreUpdateTime() throws IOException {
+            return -1L; // not supported
+        }
     }
 }
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 03a9e46..9f1bfea 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -1427,6 +1427,18 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                     }
                 });
 
+            // Periodically make sure the blobstore update time is up to date.  This could have failed if Nimbus encountered
+            // an exception updating the update time, or due to bugs causing a missed update of the blobstore mod time on a blob
+            // update.
+            timer.scheduleRecurring(30, ServerConfigUtils.getLocalizerUpdateBlobInterval(conf) * 5,
+                () -> {
+                    try {
+                        blobStore.validateBlobUpdateTime();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+
             metricsRegistry.registerGauge("nimbus:total-available-memory-non-negative", () -> nodeIdToResources.get().values()
                     .parallelStream()
                     .mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableMem(), 0))
@@ -3765,6 +3777,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             os.close();
             LOG.info("Finished uploading blob for session {}. Closing session.", session);
             blobUploaders.remove(session);
+            blobStore.updateLastBlobUpdateTime();
         } catch (Exception e) {
             LOG.warn("finish blob upload exception.", e);
             if (e instanceof TException) {
@@ -3812,6 +3825,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         throws AuthorizationException, KeyNotFoundException, TException {
         try {
             blobStore.setBlobMeta(key, meta, getSubject());
+            blobStore.updateLastBlobUpdateTime();
         } catch (Exception e) {
             LOG.warn("set blob meta exception.", e);
             if (e instanceof TException) {
@@ -3952,7 +3966,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     public int updateBlobReplication(String key, int replication)
         throws AuthorizationException, KeyNotFoundException, TException {
         try {
-            return blobStore.updateBlobReplication(key, replication, getSubject());
+            int result = blobStore.updateBlobReplication(key, replication, getSubject());
+            blobStore.updateLastBlobUpdateTime();
+            return result;
         } catch (Exception e) {
             LOG.warn("update blob replication exception.", e);
             if (e instanceof TException) {
@@ -4035,6 +4051,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             channel.close();
             LOG.info("Finished uploading file from client: {}", location);
             uploaders.remove(location);
+            blobStore.updateLastBlobUpdateTime();
         } catch (Exception e) {
             LOG.warn("finish file upload exception.", e);
             if (e instanceof TException) {
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 399dbf1..2c3eaa1 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -59,6 +59,7 @@ import org.apache.storm.thrift.transport.TTransportException;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.NimbusLeaderNotFoundException;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ServerConfigUtils;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.WrappedKeyNotFoundException;
@@ -74,10 +75,8 @@ public class AsyncLocalizer implements AutoCloseable {
     private static final CompletableFuture<Void> ALL_DONE_FUTURE = CompletableFuture.completedFuture(null);
     private static final int ATTEMPTS_INTERVAL_TIME = 100;
 
-    private final Timer singleBlobLocalizationDuration;
     private final Timer blobCacheUpdateDuration;
     private final Timer blobLocalizationDuration;
-    private final Meter numBlobUpdateVersionChanged;
     private final Meter localResourceFileNotFoundWhenReleasingSlot;
     private final Meter updateBlobExceptions;
 
@@ -100,7 +99,7 @@ public class AsyncLocalizer implements AutoCloseable {
     private final ScheduledExecutorService downloadExecService;
     private final ScheduledExecutorService taskExecService;
     private final long cacheCleanupPeriod;
-    private final long updateBlobPeriod;
+    private final int updateBlobPeriod;
     private final StormMetricsRegistry metricsRegistry;
     // cleanup
     @VisibleForTesting
@@ -109,10 +108,8 @@ public class AsyncLocalizer implements AutoCloseable {
     @VisibleForTesting
     AsyncLocalizer(Map<String, Object> conf, AdvancedFSOps ops, String baseDir, StormMetricsRegistry metricsRegistry) throws IOException {
         this.conf = conf;
-        this.singleBlobLocalizationDuration = metricsRegistry.registerTimer("supervisor:single-blob-localization-duration");
         this.blobCacheUpdateDuration = metricsRegistry.registerTimer("supervisor:blob-cache-update-duration");
         this.blobLocalizationDuration = metricsRegistry.registerTimer("supervisor:blob-localization-duration");
-        this.numBlobUpdateVersionChanged = metricsRegistry.registerMeter("supervisor:num-blob-update-version-changed");
         this.localResourceFileNotFoundWhenReleasingSlot
                 = metricsRegistry.registerMeter("supervisor:local-resource-file-not-found-when-releasing-slot");
         this.updateBlobExceptions = metricsRegistry.registerMeter("supervisor:update-blob-exceptions");
@@ -127,8 +124,7 @@ public class AsyncLocalizer implements AutoCloseable {
         cacheCleanupPeriod = ObjectReader.getInt(conf.get(
             DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 30 * 1000).longValue();
 
-        updateBlobPeriod = ObjectReader.getInt(conf.get(
-                DaemonConfig.SUPERVISOR_LOCALIZER_UPDATE_BLOB_INTERVAL_SECS), 30).longValue();
+        updateBlobPeriod = ServerConfigUtils.getLocalizerUpdateBlobInterval(conf);
 
         blobDownloadRetries = ObjectReader.getInt(conf.get(
             DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
@@ -270,6 +266,9 @@ public class AsyncLocalizer implements AutoCloseable {
     }
 
     private CompletableFuture<Void> downloadOrUpdate(Collection<? extends LocallyCachedBlob> blobs) {
+
+        final long remoteBlobstoreUpdateTime = getRemoteBlobstoreUpdateTime();
+
         CompletableFuture<Void>[] all = new CompletableFuture[blobs.size()];
         int i = 0;
         for (final LocallyCachedBlob blob : blobs) {
@@ -280,29 +279,7 @@ public class AsyncLocalizer implements AutoCloseable {
                     long failures = 0;
                     while (!done) {
                         try {
-                            synchronized (blob) {
-                                if (blob.isUsed()) {
-                                    long localVersion = blob.getLocalVersion();
-                                    long remoteVersion = blob.getRemoteVersion(blobStore);
-                                    if (localVersion != remoteVersion || !blob.isFullyDownloaded()) {
-                                        if (blob.isFullyDownloaded()) {
-                                            //Avoid case of different blob version
-                                            // when blob is not downloaded (first time download)
-                                            numBlobUpdateVersionChanged.mark();
-                                        }
-                                        Timer.Context t = singleBlobLocalizationDuration.time();
-                                        try {
-                                            long newVersion = blob.fetchUnzipToTemp(blobStore);
-                                            blob.informReferencesAndCommitNewVersion(newVersion);
-                                            t.stop();
-                                        } finally {
-                                            blob.cleanupOrphanedData();
-                                        }
-                                    }
-                                } else {
-                                    LOG.debug("Skipping update of unused blob {}", blob);
-                                }
-                            }
+                            blob.update(blobStore, remoteBlobstoreUpdateTime);
                             done = true;
                         } catch (Exception e) {
                             failures++;
@@ -321,6 +298,17 @@ public class AsyncLocalizer implements AutoCloseable {
         return CompletableFuture.allOf(all);
     }
 
+    private long getRemoteBlobstoreUpdateTime() {
+        try (ClientBlobStore blobStore = getClientBlobStore()) {
+            try {
+                return blobStore.getRemoteBlobstoreUpdateTime();
+            } catch (IOException e) {
+                LOG.error("Failed to get remote blobstore update time", e);
+                return -1L;
+            }
+        }
+    }
+
     /**
      * Downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files
      * with a suffix. The runnable is intended to be run periodically by a timer, created elsewhere.
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
index 0373fa2..cf53dff 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
@@ -13,6 +13,8 @@
 package org.apache.storm.localizer;
 
 import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -27,7 +29,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.storm.blobstore.ClientBlobStore;
 import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.generated.AuthorizationException;
@@ -52,6 +53,9 @@ public abstract class LocallyCachedBlob {
     private AtomicLong lastUsed = new AtomicLong(Time.currentTimeMillis());
 
     private final Histogram fetchingRate;
+    private final Meter numBlobUpdateVersionChanged;
+    private final Timer singleBlobLocalizationDuration;
+    protected long localUpdateTime = -1L;
 
     /**
      * Create a new LocallyCachedBlob.
@@ -63,6 +67,8 @@ public abstract class LocallyCachedBlob {
         this.blobDescription = blobDescription;
         this.blobKey = blobKey;
         this.fetchingRate = metricsRegistry.registerHistogram("supervisor:blob-fetching-rate-MB/s");
+        this.numBlobUpdateVersionChanged = metricsRegistry.registerMeter("supervisor:num-blob-update-version-changed");
+        this.singleBlobLocalizationDuration = metricsRegistry.registerTimer("supervisor:single-blob-localization-duration");
     }
 
     /**
@@ -310,6 +316,90 @@ public abstract class LocallyCachedBlob {
 
     public abstract boolean isFullyDownloaded();
 
+    /**
+     * Checks to see if the local blob requires update with respect to a remote blob.
+     *
+     * @param blobStore the client blobstore
+     * @param remoteBlobstoreUpdateTime last update time of remote blobstore
+     * @return true of the local blob requires update, false otherwise.
+     *
+     * @throws KeyNotFoundException if the remote blob is missing
+     * @throws AuthorizationException if authorization is failed
+     */
+    boolean requiresUpdate(ClientBlobStore blobStore, long remoteBlobstoreUpdateTime) throws KeyNotFoundException, AuthorizationException {
+        if (!this.isUsed()) {
+            return false;
+        }
+
+        if (!this.isFullyDownloaded()) {
+            return true;
+        }
+
+        // If we are already up to date with respect to the remote blob store, don't query
+        // the remote blobstore for the remote file.  This reduces Hadoop namenode impact of
+        // 100's of supervisors querying multiple blobs.
+        if (remoteBlobstoreUpdateTime > 0 && this.localUpdateTime == remoteBlobstoreUpdateTime) {
+            LOG.debug("{} is up to date, blob localUpdateTime matches remote timestamp {}", this, remoteBlobstoreUpdateTime);
+            return false;
+        }
+
+        long localVersion = this.getLocalVersion();
+        long remoteVersion = this.getRemoteVersion(blobStore);
+        if (localVersion != remoteVersion) {
+            return true;
+        } else {
+            // track that we are now up to date with respect to last time the remote blobstore was updated
+            this.localUpdateTime = remoteBlobstoreUpdateTime;
+            return false;
+        }
+    }
+
+    /**
+     * Downloads a blob locally.
+     *
+     * @param blobStore the client blobstore
+     * @param remoteBlobstoreUpdateTime last modification time of remote blobstore
+     *
+     * @throws KeyNotFoundException if the remote blob is missing
+     * @throws AuthorizationException if authorization is failed
+     * @throws IOException on errors
+     */
+    private void download(ClientBlobStore blobStore, long remoteBlobstoreUpdateTime)
+            throws AuthorizationException, IOException, KeyNotFoundException {
+        if (this.isFullyDownloaded()) {
+            numBlobUpdateVersionChanged.mark();
+        }
+        Timer.Context timer = singleBlobLocalizationDuration.time();
+        try {
+            long newVersion = this.fetchUnzipToTemp(blobStore);
+            this.informReferencesAndCommitNewVersion(newVersion);
+            this.localUpdateTime = remoteBlobstoreUpdateTime;
+            LOG.debug("local blob {} downloaded, in sync with remote blobstore to time {}", this, remoteBlobstoreUpdateTime);
+        } finally {
+            timer.stop();
+            this.cleanupOrphanedData();
+        }
+    }
+
+    /**
+     * Checks and downloads a blob locally as necessary.
+     *
+     * @param blobStore the client blobstore
+     * @param remoteBlobstoreUpdateTime last update time of remote blobstore
+     *
+     * @throws KeyNotFoundException if the remote blob is missing
+     * @throws AuthorizationException if authorization is failed
+     * @throws IOException on errors
+     */
+    public void update(ClientBlobStore blobStore, long remoteBlobstoreUpdateTime)
+            throws KeyNotFoundException, AuthorizationException, IOException {
+        synchronized (this) {
+            if (this.requiresUpdate(blobStore, remoteBlobstoreUpdateTime)) {
+                this.download(blobStore, remoteBlobstoreUpdateTime);
+            }
+        }
+    }
+
     static class DownloadMeta {
         private final Path downloadPath;
         private final long version;
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java
index 156bf20..a6b0ba6 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java
@@ -158,4 +158,9 @@ public class ServerConfigUtils {
     public LocalState nimbusTopoHistoryStateImpl(Map<String, Object> conf) throws IOException {
         return new LocalState((masterLocalDir(conf) + FILE_SEPARATOR + "history"), true);
     }
+
+    public static int getLocalizerUpdateBlobInterval(Map<String, Object> conf) {
+        return ObjectReader.getInt(conf.get(
+                DaemonConfig.SUPERVISOR_LOCALIZER_UPDATE_BLOB_INTERVAL_SECS), 30);
+    }
 }
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index f97c868..3b434c3 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -120,6 +120,7 @@ public class AsyncLocalizerTest {
             final int port = 8080;
 
             ClientBlobStore blobStore = mock(ClientBlobStore.class);
+            when(blobStore.getRemoteBlobstoreUpdateTime()).thenReturn(-1L);
 
             LocallyCachedTopologyBlob jarBlob = mock(LocallyCachedTopologyBlob.class);
             doReturn(jarBlob).when(victim).getTopoJar(topoId, localAssignment.get_owner());
@@ -148,18 +149,9 @@ public class AsyncLocalizerTest {
             Future<Void> f = victim.requestDownloadBaseTopologyBlobs(pna, null);
             f.get(20, TimeUnit.SECONDS);
 
-            verify(jarBlob).fetchUnzipToTemp(any());
-            verify(jarBlob).informReferencesAndCommitNewVersion(100L);
-            verify(jarBlob).cleanupOrphanedData();
-
-            verify(codeBlob).fetchUnzipToTemp(any());
-            verify(codeBlob).informReferencesAndCommitNewVersion(200L);
-            verify(codeBlob).cleanupOrphanedData();
-
-            verify(confBlob).fetchUnzipToTemp(any());
-            verify(confBlob).informReferencesAndCommitNewVersion(300L);
-            verify(confBlob).cleanupOrphanedData();
-
+            verify(jarBlob).update(eq(blobStore), eq(-1L));
+            verify(codeBlob).update(eq(blobStore), eq(-1L));
+            verify(confBlob).update(eq(blobStore), eq(-1L));
         } finally {
             ReflectionUtils.setInstance(previousReflectionUtils);
             ServerUtils.setInstance(previousServerUtils);
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/LocallyCachedBlobTest.java b/storm-server/src/test/java/org/apache/storm/localizer/LocallyCachedBlobTest.java
new file mode 100644
index 0000000..a5cbf9b
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/localizer/LocallyCachedBlobTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.localizer;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.IAdvancedFSOps;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class LocallyCachedBlobTest {
+    private static ClientBlobStore blobStore = Mockito.mock(ClientBlobStore.class);
+    private static PortAndAssignment pna = new PortAndAssignmentImpl(6077, new LocalAssignment());
+    private static Map<String, Object> conf = new HashMap<>();
+
+    @Test
+    public void testNotUsed() throws KeyNotFoundException, AuthorizationException {
+        LocallyCachedBlob blob = new LocalizedResource("key", Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new StormMetricsRegistry());
+        Assert.assertFalse(blob.isUsed());
+        Assert.assertFalse(blob.requiresUpdate(blobStore, -1L));
+    }
+
+    @Test
+    public void testNotDownloaded() throws KeyNotFoundException, AuthorizationException {
+        LocallyCachedBlob blob = new LocalizedResource("key", Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new StormMetricsRegistry());
+        blob.addReference(pna, null);
+        Assert.assertTrue(blob.isUsed());
+        Assert.assertFalse(blob.isFullyDownloaded());
+        Assert.assertTrue(blob.requiresUpdate(blobStore, -1L));
+    }
+
+    @Test
+    public void testOutOfDate() throws KeyNotFoundException, AuthorizationException {
+        TestableBlob blob = new TestableBlob("key", Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new StormMetricsRegistry());
+        blob.addReference(pna, null);
+        Assert.assertTrue(blob.isUsed());
+        Assert.assertTrue(blob.isFullyDownloaded());
+
+        // validate blob needs update due to version mismatch
+        Assert.assertTrue(blob.requiresUpdate(blobStore, -1L));
+
+        // when blob update time matches remote blobstore update time, validate blob
+        // will skip looking at remote version and assume it's up to date
+        blob.localUpdateTime = 101L;
+        Assert.assertFalse(blob.requiresUpdate(blobStore, 101L));
+
+        // now when the update time on the remote blobstore differs, we should again see that the
+        // blob version differs from the remote blobstore
+        Assert.assertTrue(blob.requiresUpdate(blobStore, 102L));
+
+        // now validate we don't need any update as versions match, regardless of remote blobstore update time
+        blob.localVersion = blob.getRemoteVersion(blobStore);
+        Assert.assertFalse(blob.requiresUpdate(blobStore, -1L));
+        Assert.assertFalse(blob.requiresUpdate(blobStore, 101L));
+        Assert.assertFalse(blob.requiresUpdate(blobStore, 102L));
+    }
+
+    public class TestableBlob extends LocalizedResource {
+        long localVersion = 9L;
+
+        TestableBlob(String key, Path localBaseDir, boolean shouldUncompress, IAdvancedFSOps fsOps, Map<String, Object> conf, String user, StormMetricsRegistry metricRegistry) {
+            super(key, localBaseDir, shouldUncompress, fsOps, conf, user, metricRegistry);
+        }
+
+        @Override
+        public boolean isFullyDownloaded() {
+            return true;
+        }
+
+        @Override
+        public long getRemoteVersion(ClientBlobStore store) throws KeyNotFoundException, AuthorizationException {
+            return 10L;
+        }
+
+        @Override
+        public long getLocalVersion() {
+            return localVersion;
+        }
+    }
+}