You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2021/01/14 15:15:30 UTC
[storm] branch master updated: STORM-3724 use blobstore modtime to
prevent querying each remote file on update (#3363)
This is an automated email from the ASF dual-hosted git repository.
agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new af0b1fd STORM-3724 use blobstore modtime to prevent querying each remote file on update (#3363)
af0b1fd is described below
commit af0b1fd873a4c28253811d9bc27e32c4279daddf
Author: agresch <ag...@gmail.com>
AuthorDate: Thu Jan 14 09:15:12 2021 -0600
STORM-3724 use blobstore modtime to prevent querying each remote file on update (#3363)
* STORM-3724 use blobstore modtime to prevent querying each blob remote file on update
---
.../apache/storm/hdfs/blobstore/HdfsBlobStore.java | 14 +++
.../storm/hdfs/blobstore/HdfsBlobStoreImpl.java | 75 +++++++++++++++
.../storm/hdfs/blobstore/HdfsClientBlobStore.java | 6 ++
.../jvm/org/apache/storm/blobstore/BlobStore.java | 18 ++++
.../apache/storm/blobstore/ClientBlobStore.java | 10 +-
.../storm/blobstore/LocalModeClientBlobStore.java | 6 ++
.../apache/storm/blobstore/NimbusBlobStore.java | 5 +
.../storm/blobstore/ClientBlobStoreTest.java | 6 ++
.../org/apache/storm/daemon/nimbus/Nimbus.java | 19 +++-
.../org/apache/storm/localizer/AsyncLocalizer.java | 48 ++++------
.../apache/storm/localizer/LocallyCachedBlob.java | 92 ++++++++++++++++++-
.../org/apache/storm/utils/ServerConfigUtils.java | 5 +
.../apache/storm/localizer/AsyncLocalizerTest.java | 16 +---
.../storm/localizer/LocallyCachedBlobTest.java | 102 +++++++++++++++++++++
14 files changed, 377 insertions(+), 45 deletions(-)
diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
index 74ec2ef..7d95718 100644
--- a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
+++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
@@ -404,4 +404,18 @@ public class HdfsBlobStore extends BlobStore {
public void fullCleanup(long age) throws IOException {
hbs.fullCleanup(age);
}
+
+ public long getLastBlobUpdateTime() throws IOException {
+ return hbs.getLastBlobUpdateTime();
+ }
+
+ @Override
+ public void updateLastBlobUpdateTime() throws IOException {
+ hbs.updateLastBlobUpdateTime();
+ }
+
+ @Override
+ public void validateBlobUpdateTime() throws IOException {
+ hbs.validateBlobUpdateTime();
+ }
}
diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
index e197c39..7f81c94 100644
--- a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
+++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
@@ -18,15 +18,22 @@
package org.apache.storm.hdfs.blobstore;
+import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
+
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -34,6 +41,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.storm.Config;
import org.apache.storm.blobstore.BlobStoreFile;
import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +54,7 @@ public class HdfsBlobStoreImpl {
// blobstore directory is private!
public static final FsPermission BLOBSTORE_DIR_PERMISSION =
FsPermission.createImmutable((short) 0700); // rwx--------
+ private static final String BLOBSTORE_UPDATE_TIME_FILE = "lastUpdatedBlobTime";
private static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreImpl.class);
@@ -312,4 +321,70 @@ public class HdfsBlobStoreImpl {
timer.cancel();
}
}
+
+ /**
+ * Get the last update time of any blob.
+ *
+ * @return the last updated time of blobs within the blobstore.
+ * @throws IOException on any error
+ */
+ public long getLastBlobUpdateTime() throws IOException {
+ Path updateTimeFile = new Path(fullPath, BLOBSTORE_UPDATE_TIME_FILE);
+ if (!fileSystem.exists(updateTimeFile)) {
+ return -1L;
+ }
+ FSDataInputStream inputStream = fileSystem.open(updateTimeFile);
+ String timestamp = IOUtils.toString(inputStream, "UTF-8");
+ inputStream.close();
+ try {
+ long updateTime = Long.parseLong(timestamp);
+ return updateTime;
+ } catch (NumberFormatException e) {
+ LOG.error("Invalid blobstore update time {} in file {}", timestamp, updateTimeFile);
+ return -1L;
+ }
+ }
+
+ /**
+ * Updates the last updated time of existing blobstores to the current time.
+ *
+ * @throws IOException on any error
+ */
+ public synchronized void updateLastBlobUpdateTime() throws IOException {
+ Long timestamp = Time.currentTimeMillis();
+ Path updateTimeFile = new Path(fullPath, BLOBSTORE_UPDATE_TIME_FILE);
+ FSDataOutputStream fsDataOutputStream = fileSystem.create(updateTimeFile, true);
+ BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
+ bufferedWriter.write(timestamp.toString());
+ bufferedWriter.close();
+ LOG.debug("Updated blobstore update time of {} to {}", updateTimeFile, timestamp);
+ }
+
+ /**
+ * Validates that the last updated blob time of the blobstore is up to date with the current existing blobs.
+ *
+ * @throws IOException on any error
+ */
+ public void validateBlobUpdateTime() throws IOException {
+ int currentBucket = 0;
+ long baseModTime = 0;
+ while (currentBucket < BUCKETS) {
+ String name = String.valueOf(currentBucket);
+ Path bucketDir = new Path(fullPath, name);
+
+ // only consider bucket dirs that exist with files in them
+ if (fileSystem.exists(bucketDir) && fileSystem.listStatus(bucketDir).length > 0) {
+ long modtime = fileSystem.getFileStatus(bucketDir).getModificationTime();
+ if (modtime > baseModTime) {
+ baseModTime = modtime;
+ }
+ }
+
+ currentBucket++;
+ }
+ if (baseModTime > 0 && baseModTime > getLastBlobUpdateTime()) {
+ LOG.info("Blobstore update time requires an update to at least {}", baseModTime);
+ updateLastBlobUpdateTime();
+ }
+ }
}
diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
index 65861eb..fbdc186 100644
--- a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
+++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
@@ -18,6 +18,7 @@
package org.apache.storm.hdfs.blobstore;
+import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
@@ -133,4 +134,9 @@ public class HdfsClientBlobStore extends ClientBlobStore {
client = null;
}
}
+
+ @Override
+ public long getRemoteBlobstoreUpdateTime() throws IOException {
+ return blobStore.getLastBlobUpdateTime();
+ }
}
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
index c9219c5..5f41584 100644
--- a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
@@ -319,6 +319,24 @@ public abstract class BlobStore implements Shutdownable, AutoCloseable {
}
/**
+ * Updates the last update time of existing blobs in the blobstore to the current time.
+ *
+ * @throws IOException on any error
+ */
+ public void updateLastBlobUpdateTime() throws IOException {
+ // default implementation is a NOOP.
+ }
+
+ /**
+ * Validates that the blob update time of the blobstore is up to date with the current existing blobs.
+ *
+ * @throws IOException on any error
+ */
+ public void validateBlobUpdateTime() throws IOException {
+ // default implementation is a NOOP.
+ }
+
+ /**
* Blob store implements its own version of iterator to list the blobs.
*/
public static class KeyTranslationIterator implements Iterator<String> {
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
index a83146d..02b284f 100644
--- a/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
@@ -12,6 +12,7 @@
package org.apache.storm.blobstore;
+import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.storm.daemon.Shutdownable;
@@ -182,5 +183,12 @@ public abstract class ClientBlobStore implements Shutdownable, AutoCloseable {
void run(ClientBlobStore blobStore) throws Exception;
}
-
+ /**
+ * Client facing API to get the last update time of existing blobs in a blobstore. This is only required for use on
+ * supervisors.
+ *
+ * @return the timestamp of when the blobstore was last updated. -1L if the blobstore
+ * does not support this.
+ */
+ public abstract long getRemoteBlobstoreUpdateTime() throws IOException;
}
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/LocalModeClientBlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/LocalModeClientBlobStore.java
index 70ed6c2..776371f 100644
--- a/storm-client/src/jvm/org/apache/storm/blobstore/LocalModeClientBlobStore.java
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/LocalModeClientBlobStore.java
@@ -12,6 +12,7 @@
package org.apache.storm.blobstore;
+import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.storm.generated.AuthorizationException;
@@ -123,4 +124,9 @@ public class LocalModeClientBlobStore extends ClientBlobStore {
public void close() {
wrapped.shutdown();
}
+
+ @Override
+ public long getRemoteBlobstoreUpdateTime() throws IOException {
+ return -1L; // not supported
+ }
}
\ No newline at end of file
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
index 9a8f034..09be089 100644
--- a/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
@@ -217,6 +217,11 @@ public class NimbusBlobStore extends ClientBlobStore implements AutoCloseable {
shutdown();
}
+ @Override
+ public long getRemoteBlobstoreUpdateTime() throws IOException {
+ return -1L; // not supported
+ }
+
public class NimbusKeyIterator implements Iterator<String> {
private ListBlobsResult listBlobs = null;
private int offset = 0;
diff --git a/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java b/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java
index 65a37df..8bbb015 100644
--- a/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java
+++ b/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java
@@ -12,6 +12,7 @@
package org.apache.storm.blobstore;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -185,5 +186,10 @@ public class ClientBlobStoreTest {
@Override
public void createStateInZookeeper(String key) {
}
+
+ @Override
+ public long getRemoteBlobstoreUpdateTime() throws IOException {
+ return -1L; // not supported
+ }
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 03a9e46..9f1bfea 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -1427,6 +1427,18 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
});
+ // Periodically make sure the blobstore update time is up to date. This could have failed if Nimbus encountered
+ // an exception updating the update time, or due to bugs causing a missed update of the blobstore mod time on a blob
+ // update.
+ timer.scheduleRecurring(30, ServerConfigUtils.getLocalizerUpdateBlobInterval(conf) * 5,
+ () -> {
+ try {
+ blobStore.validateBlobUpdateTime();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
metricsRegistry.registerGauge("nimbus:total-available-memory-non-negative", () -> nodeIdToResources.get().values()
.parallelStream()
.mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableMem(), 0))
@@ -3765,6 +3777,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
os.close();
LOG.info("Finished uploading blob for session {}. Closing session.", session);
blobUploaders.remove(session);
+ blobStore.updateLastBlobUpdateTime();
} catch (Exception e) {
LOG.warn("finish blob upload exception.", e);
if (e instanceof TException) {
@@ -3812,6 +3825,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
throws AuthorizationException, KeyNotFoundException, TException {
try {
blobStore.setBlobMeta(key, meta, getSubject());
+ blobStore.updateLastBlobUpdateTime();
} catch (Exception e) {
LOG.warn("set blob meta exception.", e);
if (e instanceof TException) {
@@ -3952,7 +3966,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
public int updateBlobReplication(String key, int replication)
throws AuthorizationException, KeyNotFoundException, TException {
try {
- return blobStore.updateBlobReplication(key, replication, getSubject());
+ int result = blobStore.updateBlobReplication(key, replication, getSubject());
+ blobStore.updateLastBlobUpdateTime();
+ return result;
} catch (Exception e) {
LOG.warn("update blob replication exception.", e);
if (e instanceof TException) {
@@ -4035,6 +4051,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
channel.close();
LOG.info("Finished uploading file from client: {}", location);
uploaders.remove(location);
+ blobStore.updateLastBlobUpdateTime();
} catch (Exception e) {
LOG.warn("finish file upload exception.", e);
if (e instanceof TException) {
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 399dbf1..2c3eaa1 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -59,6 +59,7 @@ import org.apache.storm.thrift.transport.TTransportException;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.NimbusLeaderNotFoundException;
import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.WrappedKeyNotFoundException;
@@ -74,10 +75,8 @@ public class AsyncLocalizer implements AutoCloseable {
private static final CompletableFuture<Void> ALL_DONE_FUTURE = CompletableFuture.completedFuture(null);
private static final int ATTEMPTS_INTERVAL_TIME = 100;
- private final Timer singleBlobLocalizationDuration;
private final Timer blobCacheUpdateDuration;
private final Timer blobLocalizationDuration;
- private final Meter numBlobUpdateVersionChanged;
private final Meter localResourceFileNotFoundWhenReleasingSlot;
private final Meter updateBlobExceptions;
@@ -100,7 +99,7 @@ public class AsyncLocalizer implements AutoCloseable {
private final ScheduledExecutorService downloadExecService;
private final ScheduledExecutorService taskExecService;
private final long cacheCleanupPeriod;
- private final long updateBlobPeriod;
+ private final int updateBlobPeriod;
private final StormMetricsRegistry metricsRegistry;
// cleanup
@VisibleForTesting
@@ -109,10 +108,8 @@ public class AsyncLocalizer implements AutoCloseable {
@VisibleForTesting
AsyncLocalizer(Map<String, Object> conf, AdvancedFSOps ops, String baseDir, StormMetricsRegistry metricsRegistry) throws IOException {
this.conf = conf;
- this.singleBlobLocalizationDuration = metricsRegistry.registerTimer("supervisor:single-blob-localization-duration");
this.blobCacheUpdateDuration = metricsRegistry.registerTimer("supervisor:blob-cache-update-duration");
this.blobLocalizationDuration = metricsRegistry.registerTimer("supervisor:blob-localization-duration");
- this.numBlobUpdateVersionChanged = metricsRegistry.registerMeter("supervisor:num-blob-update-version-changed");
this.localResourceFileNotFoundWhenReleasingSlot
= metricsRegistry.registerMeter("supervisor:local-resource-file-not-found-when-releasing-slot");
this.updateBlobExceptions = metricsRegistry.registerMeter("supervisor:update-blob-exceptions");
@@ -127,8 +124,7 @@ public class AsyncLocalizer implements AutoCloseable {
cacheCleanupPeriod = ObjectReader.getInt(conf.get(
DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 30 * 1000).longValue();
- updateBlobPeriod = ObjectReader.getInt(conf.get(
- DaemonConfig.SUPERVISOR_LOCALIZER_UPDATE_BLOB_INTERVAL_SECS), 30).longValue();
+ updateBlobPeriod = ServerConfigUtils.getLocalizerUpdateBlobInterval(conf);
blobDownloadRetries = ObjectReader.getInt(conf.get(
DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
@@ -270,6 +266,9 @@ public class AsyncLocalizer implements AutoCloseable {
}
private CompletableFuture<Void> downloadOrUpdate(Collection<? extends LocallyCachedBlob> blobs) {
+
+ final long remoteBlobstoreUpdateTime = getRemoteBlobstoreUpdateTime();
+
CompletableFuture<Void>[] all = new CompletableFuture[blobs.size()];
int i = 0;
for (final LocallyCachedBlob blob : blobs) {
@@ -280,29 +279,7 @@ public class AsyncLocalizer implements AutoCloseable {
long failures = 0;
while (!done) {
try {
- synchronized (blob) {
- if (blob.isUsed()) {
- long localVersion = blob.getLocalVersion();
- long remoteVersion = blob.getRemoteVersion(blobStore);
- if (localVersion != remoteVersion || !blob.isFullyDownloaded()) {
- if (blob.isFullyDownloaded()) {
- //Avoid case of different blob version
- // when blob is not downloaded (first time download)
- numBlobUpdateVersionChanged.mark();
- }
- Timer.Context t = singleBlobLocalizationDuration.time();
- try {
- long newVersion = blob.fetchUnzipToTemp(blobStore);
- blob.informReferencesAndCommitNewVersion(newVersion);
- t.stop();
- } finally {
- blob.cleanupOrphanedData();
- }
- }
- } else {
- LOG.debug("Skipping update of unused blob {}", blob);
- }
- }
+ blob.update(blobStore, remoteBlobstoreUpdateTime);
done = true;
} catch (Exception e) {
failures++;
@@ -321,6 +298,17 @@ public class AsyncLocalizer implements AutoCloseable {
return CompletableFuture.allOf(all);
}
+ private long getRemoteBlobstoreUpdateTime() {
+ try (ClientBlobStore blobStore = getClientBlobStore()) {
+ try {
+ return blobStore.getRemoteBlobstoreUpdateTime();
+ } catch (IOException e) {
+ LOG.error("Failed to get remote blobstore update time", e);
+ return -1L;
+ }
+ }
+ }
+
/**
* Downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files
* with a suffix. The runnable is intended to be run periodically by a timer, created elsewhere.
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
index 0373fa2..cf53dff 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
@@ -13,6 +13,8 @@
package org.apache.storm.localizer;
import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
@@ -27,7 +29,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.generated.AuthorizationException;
@@ -52,6 +53,9 @@ public abstract class LocallyCachedBlob {
private AtomicLong lastUsed = new AtomicLong(Time.currentTimeMillis());
private final Histogram fetchingRate;
+ private final Meter numBlobUpdateVersionChanged;
+ private final Timer singleBlobLocalizationDuration;
+ protected long localUpdateTime = -1L;
/**
* Create a new LocallyCachedBlob.
@@ -63,6 +67,8 @@ public abstract class LocallyCachedBlob {
this.blobDescription = blobDescription;
this.blobKey = blobKey;
this.fetchingRate = metricsRegistry.registerHistogram("supervisor:blob-fetching-rate-MB/s");
+ this.numBlobUpdateVersionChanged = metricsRegistry.registerMeter("supervisor:num-blob-update-version-changed");
+ this.singleBlobLocalizationDuration = metricsRegistry.registerTimer("supervisor:single-blob-localization-duration");
}
/**
@@ -310,6 +316,90 @@ public abstract class LocallyCachedBlob {
public abstract boolean isFullyDownloaded();
+ /**
+ * Checks to see if the local blob requires update with respect to a remote blob.
+ *
+ * @param blobStore the client blobstore
+ * @param remoteBlobstoreUpdateTime last update time of remote blobstore
+ * @return true of the local blob requires update, false otherwise.
+ *
+ * @throws KeyNotFoundException if the remote blob is missing
+ * @throws AuthorizationException if authorization is failed
+ */
+ boolean requiresUpdate(ClientBlobStore blobStore, long remoteBlobstoreUpdateTime) throws KeyNotFoundException, AuthorizationException {
+ if (!this.isUsed()) {
+ return false;
+ }
+
+ if (!this.isFullyDownloaded()) {
+ return true;
+ }
+
+ // If we are already up to date with respect to the remote blob store, don't query
+ // the remote blobstore for the remote file. This reduces Hadoop namenode impact of
+ // 100's of supervisors querying multiple blobs.
+ if (remoteBlobstoreUpdateTime > 0 && this.localUpdateTime == remoteBlobstoreUpdateTime) {
+ LOG.debug("{} is up to date, blob localUpdateTime matches remote timestamp {}", this, remoteBlobstoreUpdateTime);
+ return false;
+ }
+
+ long localVersion = this.getLocalVersion();
+ long remoteVersion = this.getRemoteVersion(blobStore);
+ if (localVersion != remoteVersion) {
+ return true;
+ } else {
+ // track that we are now up to date with respect to last time the remote blobstore was updated
+ this.localUpdateTime = remoteBlobstoreUpdateTime;
+ return false;
+ }
+ }
+
+ /**
+ * Downloads a blob locally.
+ *
+ * @param blobStore the client blobstore
+ * @param remoteBlobstoreUpdateTime last modification time of remote blobstore
+ *
+ * @throws KeyNotFoundException if the remote blob is missing
+ * @throws AuthorizationException if authorization is failed
+ * @throws IOException on errors
+ */
+ private void download(ClientBlobStore blobStore, long remoteBlobstoreUpdateTime)
+ throws AuthorizationException, IOException, KeyNotFoundException {
+ if (this.isFullyDownloaded()) {
+ numBlobUpdateVersionChanged.mark();
+ }
+ Timer.Context timer = singleBlobLocalizationDuration.time();
+ try {
+ long newVersion = this.fetchUnzipToTemp(blobStore);
+ this.informReferencesAndCommitNewVersion(newVersion);
+ this.localUpdateTime = remoteBlobstoreUpdateTime;
+ LOG.debug("local blob {} downloaded, in sync with remote blobstore to time {}", this, remoteBlobstoreUpdateTime);
+ } finally {
+ timer.stop();
+ this.cleanupOrphanedData();
+ }
+ }
+
+ /**
+ * Checks and downloads a blob locally as necessary.
+ *
+ * @param blobStore the client blobstore
+ * @param remoteBlobstoreUpdateTime last update time of remote blobstore
+ *
+ * @throws KeyNotFoundException if the remote blob is missing
+ * @throws AuthorizationException if authorization is failed
+ * @throws IOException on errors
+ */
+ public void update(ClientBlobStore blobStore, long remoteBlobstoreUpdateTime)
+ throws KeyNotFoundException, AuthorizationException, IOException {
+ synchronized (this) {
+ if (this.requiresUpdate(blobStore, remoteBlobstoreUpdateTime)) {
+ this.download(blobStore, remoteBlobstoreUpdateTime);
+ }
+ }
+ }
+
static class DownloadMeta {
private final Path downloadPath;
private final long version;
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java
index 156bf20..a6b0ba6 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java
@@ -158,4 +158,9 @@ public class ServerConfigUtils {
public LocalState nimbusTopoHistoryStateImpl(Map<String, Object> conf) throws IOException {
return new LocalState((masterLocalDir(conf) + FILE_SEPARATOR + "history"), true);
}
+
+ public static int getLocalizerUpdateBlobInterval(Map<String, Object> conf) {
+ return ObjectReader.getInt(conf.get(
+ DaemonConfig.SUPERVISOR_LOCALIZER_UPDATE_BLOB_INTERVAL_SECS), 30);
+ }
}
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index f97c868..3b434c3 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -120,6 +120,7 @@ public class AsyncLocalizerTest {
final int port = 8080;
ClientBlobStore blobStore = mock(ClientBlobStore.class);
+ when(blobStore.getRemoteBlobstoreUpdateTime()).thenReturn(-1L);
LocallyCachedTopologyBlob jarBlob = mock(LocallyCachedTopologyBlob.class);
doReturn(jarBlob).when(victim).getTopoJar(topoId, localAssignment.get_owner());
@@ -148,18 +149,9 @@ public class AsyncLocalizerTest {
Future<Void> f = victim.requestDownloadBaseTopologyBlobs(pna, null);
f.get(20, TimeUnit.SECONDS);
- verify(jarBlob).fetchUnzipToTemp(any());
- verify(jarBlob).informReferencesAndCommitNewVersion(100L);
- verify(jarBlob).cleanupOrphanedData();
-
- verify(codeBlob).fetchUnzipToTemp(any());
- verify(codeBlob).informReferencesAndCommitNewVersion(200L);
- verify(codeBlob).cleanupOrphanedData();
-
- verify(confBlob).fetchUnzipToTemp(any());
- verify(confBlob).informReferencesAndCommitNewVersion(300L);
- verify(confBlob).cleanupOrphanedData();
-
+ verify(jarBlob).update(eq(blobStore), eq(-1L));
+ verify(codeBlob).update(eq(blobStore), eq(-1L));
+ verify(confBlob).update(eq(blobStore), eq(-1L));
} finally {
ReflectionUtils.setInstance(previousReflectionUtils);
ServerUtils.setInstance(previousServerUtils);
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/LocallyCachedBlobTest.java b/storm-server/src/test/java/org/apache/storm/localizer/LocallyCachedBlobTest.java
new file mode 100644
index 0000000..a5cbf9b
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/localizer/LocallyCachedBlobTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.localizer;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.IAdvancedFSOps;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class LocallyCachedBlobTest {
+ private static ClientBlobStore blobStore = Mockito.mock(ClientBlobStore.class);
+ private static PortAndAssignment pna = new PortAndAssignmentImpl(6077, new LocalAssignment());
+ private static Map<String, Object> conf = new HashMap<>();
+
+ @Test
+ public void testNotUsed() throws KeyNotFoundException, AuthorizationException {
+ LocallyCachedBlob blob = new LocalizedResource("key", Paths.get("/bogus"), false,
+ AdvancedFSOps.make(conf), conf, "user1", new StormMetricsRegistry());
+ Assert.assertFalse(blob.isUsed());
+ Assert.assertFalse(blob.requiresUpdate(blobStore, -1L));
+ }
+
+ @Test
+ public void testNotDownloaded() throws KeyNotFoundException, AuthorizationException {
+ LocallyCachedBlob blob = new LocalizedResource("key", Paths.get("/bogus"), false,
+ AdvancedFSOps.make(conf), conf, "user1", new StormMetricsRegistry());
+ blob.addReference(pna, null);
+ Assert.assertTrue(blob.isUsed());
+ Assert.assertFalse(blob.isFullyDownloaded());
+ Assert.assertTrue(blob.requiresUpdate(blobStore, -1L));
+ }
+
+ @Test
+ public void testOutOfDate() throws KeyNotFoundException, AuthorizationException {
+ TestableBlob blob = new TestableBlob("key", Paths.get("/bogus"), false,
+ AdvancedFSOps.make(conf), conf, "user1", new StormMetricsRegistry());
+ blob.addReference(pna, null);
+ Assert.assertTrue(blob.isUsed());
+ Assert.assertTrue(blob.isFullyDownloaded());
+
+ // validate blob needs update due to version mismatch
+ Assert.assertTrue(blob.requiresUpdate(blobStore, -1L));
+
+ // when blob update time matches remote blobstore update time, validate blob
+ // will skip looking at remote version and assume it's up to date
+ blob.localUpdateTime = 101L;
+ Assert.assertFalse(blob.requiresUpdate(blobStore, 101L));
+
+ // now when the update time on the remote blobstore differs, we should again see that the
+ // blob version differs from the remote blobstore
+ Assert.assertTrue(blob.requiresUpdate(blobStore, 102L));
+
+ // now validate we don't need any update as versions match, regardless of remote blobstore update time
+ blob.localVersion = blob.getRemoteVersion(blobStore);
+ Assert.assertFalse(blob.requiresUpdate(blobStore, -1L));
+ Assert.assertFalse(blob.requiresUpdate(blobStore, 101L));
+ Assert.assertFalse(blob.requiresUpdate(blobStore, 102L));
+ }
+
+ public class TestableBlob extends LocalizedResource {
+ long localVersion = 9L;
+
+ TestableBlob(String key, Path localBaseDir, boolean shouldUncompress, IAdvancedFSOps fsOps, Map<String, Object> conf, String user, StormMetricsRegistry metricRegistry) {
+ super(key, localBaseDir, shouldUncompress, fsOps, conf, user, metricRegistry);
+ }
+
+ @Override
+ public boolean isFullyDownloaded() {
+ return true;
+ }
+
+ @Override
+ public long getRemoteVersion(ClientBlobStore store) throws KeyNotFoundException, AuthorizationException {
+ return 10L;
+ }
+
+ @Override
+ public long getLocalVersion() {
+ return localVersion;
+ }
+ }
+}