You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by GitBox <gi...@apache.org> on 2020/12/17 16:23:08 UTC

[GitHub] [storm] agresch opened a new pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

agresch opened a new pull request #3363:
URL: https://github.com/apache/storm/pull/3363


   ## What is the purpose of the change
   
   Reduce the impact of listing files on a Hadoop name node by checking a single timestamp first when updating blobstores.  The Hadoop blobstore will update the modTime on the base blobstore directory anytime a blob is updated.  Supervisors will fetch that timestamp once during AsyncLocalizer updateBlobs().  For each local blob, if the last check matches this modTime, they will not query the remote Hadoop blob.  This reduces polling on the namenode.
   
   ## How was the change tested
   
   Ran code with debug logs on internal dev clusters with Hadoop and ran blobstore related integration tests with topologies.  Ran storm-client/server/hdfs-blobstore unit tests.  Ran a word count topology on a local cluster setup for 15 minutes.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r555840062



##########
File path: external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +321,70 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last update time of any blob.
+     *
+     * @return the last updated time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastBlobUpdateTime() throws IOException {
+        Path modTimeFile = new Path(fullPath, BLOBSTORE_UPDATE_TIME_FILE);
+        if (!fileSystem.exists(modTimeFile)) {
+            return -1L;
+        }
+        FSDataInputStream inputStream = fileSystem.open(modTimeFile);
+        String timestamp = IOUtils.toString(inputStream, "UTF-8");
+        inputStream.close();
+        try {
+            long modTime = Long.parseLong(timestamp);

Review comment:
       nit: can be changed to updateTime

##########
File path: storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
##########
@@ -182,5 +183,12 @@ public final void setBlobMeta(String key, SettableBlobMeta meta) throws Authoriz
         void run(ClientBlobStore blobStore) throws Exception;
     }
 
-
+    /**
+     * Client facing API to get the last update time of existing blobs in a blobstore.  This only required for use on
+     * supervisors.
+     *
+     * @return the timestamp of when the blobstore was last modified.  -1L if the blobstore

Review comment:
       nit: last updated

##########
File path: storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
##########
@@ -295,6 +301,90 @@ public String getKey() {
 
     public abstract boolean isFullyDownloaded();
 
+    /**
+     * Checks to see if the local blob requires update with respect to a remote blob.
+     *
+     * @param blobStore the client blobstore
+     * @param remoteBlobstoreUpdateTime last update time of remote blobstore
+     * @return true of the local blob requires update, false otherwise.
+     *
+     * @throws KeyNotFoundException if the remote blob is missing
+     * @throws AuthorizationException if authorization is failed
+     */
+    boolean requiresUpdate(ClientBlobStore blobStore, long remoteBlobstoreUpdateTime) throws KeyNotFoundException, AuthorizationException {
+        if (!this.isUsed()) {
+            return false;
+        }
+
+        if (!this.isFullyDownloaded()) {
+            return true;
+        }
+
+        // If we are already up to date with respect to the remote blob store, don't query
+        // the remote blobstore for the remote file.  This reduces Hadoop namenode impact of
+        // 100's of supervisors querying multiple blobs.
+        if (remoteBlobstoreUpdateTime > 0 && this.localUpdateTime == remoteBlobstoreUpdateTime) {
+            LOG.debug("{} is up to date, blob updatedModTime matches remote timestamp {}", this, remoteBlobstoreUpdateTime);

Review comment:
       nit: updatedModTime can be changed to "localUpdateTime"

##########
File path: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
##########
@@ -1427,6 +1427,18 @@ public void launchServer() throws Exception {
                     }
                 });
 
+            // Periodically make sure the blobstore modtime is up to date.  This could have failed if Nimbus encountered
+            // an exception updating the mod time, or due to bugs causing a missed update of the blobstore mod time on a blob

Review comment:
       nit: two"mod time" can be changed to "update time"

##########
File path: external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +321,70 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last update time of any blob.
+     *
+     * @return the last updated time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastBlobUpdateTime() throws IOException {
+        Path modTimeFile = new Path(fullPath, BLOBSTORE_UPDATE_TIME_FILE);
+        if (!fileSystem.exists(modTimeFile)) {
+            return -1L;
+        }
+        FSDataInputStream inputStream = fileSystem.open(modTimeFile);
+        String timestamp = IOUtils.toString(inputStream, "UTF-8");
+        inputStream.close();
+        try {
+            long modTime = Long.parseLong(timestamp);
+            return modTime;
+        } catch (NumberFormatException e) {
+            LOG.error("Invalid blobstore modtime {} in file {}", timestamp, modTimeFile);
+            return -1L;
+        }
+    }
+
+    /**
+     * Updates the last updated time of existing blobstores to the current time.
+     *
+     * @throws IOException on any error
+     */
+    public synchronized void updateLastBlobUpdateTime() throws IOException {
+        Long timestamp = Time.currentTimeMillis();
+        Path modTimeFile = new Path(fullPath, BLOBSTORE_UPDATE_TIME_FILE);

Review comment:
       nit: can be changed to updateTimeFile

##########
File path: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
##########
@@ -1427,6 +1427,18 @@ public void launchServer() throws Exception {
                     }
                 });
 
+            // Periodically make sure the blobstore modtime is up to date.  This could have failed if Nimbus encountered

Review comment:
       nit: modtime  can be changed to "update time"

##########
File path: external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +321,70 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last update time of any blob.
+     *
+     * @return the last updated time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastBlobUpdateTime() throws IOException {
+        Path modTimeFile = new Path(fullPath, BLOBSTORE_UPDATE_TIME_FILE);
+        if (!fileSystem.exists(modTimeFile)) {
+            return -1L;
+        }
+        FSDataInputStream inputStream = fileSystem.open(modTimeFile);
+        String timestamp = IOUtils.toString(inputStream, "UTF-8");
+        inputStream.close();
+        try {
+            long modTime = Long.parseLong(timestamp);
+            return modTime;
+        } catch (NumberFormatException e) {
+            LOG.error("Invalid blobstore modtime {} in file {}", timestamp, modTimeFile);

Review comment:
       nit: can be changed to update time

##########
File path: external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +321,70 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last update time of any blob.
+     *
+     * @return the last updated time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastBlobUpdateTime() throws IOException {
+        Path modTimeFile = new Path(fullPath, BLOBSTORE_UPDATE_TIME_FILE);

Review comment:
       nit: can be changed to updateTimeFile

##########
File path: storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
##########
@@ -261,6 +257,9 @@ private LocalizedResource getUserFile(String user, String key) {
     }
 
     private CompletableFuture<Void> downloadOrUpdate(Collection<? extends LocallyCachedBlob> blobs) {
+
+        final long remoteBlobstoreModTime = getRemoteBlobstoreUpdateTime();

Review comment:
       nit: change be changed to `remoteBlobstoreModTime` to `remoteBlobstoreUpdateTime`

##########
File path: storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
##########
@@ -312,6 +289,17 @@ private LocalizedResource getUserFile(String user, String key) {
         return CompletableFuture.allOf(all);
     }
 
+    private long getRemoteBlobstoreUpdateTime() {
+        try (ClientBlobStore blobStore = getClientBlobStore()) {
+            try {
+                return blobStore.getRemoteBlobstoreUpdateTime();
+            } catch (IOException e) {
+                LOG.error("Failed to get remote blobstore modtime", e);

Review comment:
       nit: can change `modtime` to "update time"

##########
File path: storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
##########
@@ -182,5 +183,12 @@ public final void setBlobMeta(String key, SettableBlobMeta meta) throws Authoriz
         void run(ClientBlobStore blobStore) throws Exception;
     }
 
-
+    /**
+     * Client facing API to get the last update time of existing blobs in a blobstore.  This only required for use on

Review comment:
       nit: This is

##########
File path: storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
##########
@@ -295,6 +301,90 @@ public String getKey() {
 
     public abstract boolean isFullyDownloaded();
 
+    /**
+     * Checks to see if the local blob requires update with respect to a remote blob.
+     *
+     * @param blobStore the client blobstore
+     * @param remoteBlobstoreUpdateTime last update time of remote blobstore
+     * @return true of the local blob requires update, false otherwise.
+     *
+     * @throws KeyNotFoundException if the remote blob is missing
+     * @throws AuthorizationException if authorization is failed
+     */
+    boolean requiresUpdate(ClientBlobStore blobStore, long remoteBlobstoreUpdateTime) throws KeyNotFoundException, AuthorizationException {
+        if (!this.isUsed()) {
+            return false;
+        }
+
+        if (!this.isFullyDownloaded()) {
+            return true;
+        }
+
+        // If we are already up to date with respect to the remote blob store, don't query
+        // the remote blobstore for the remote file.  This reduces Hadoop namenode impact of
+        // 100's of supervisors querying multiple blobs.
+        if (remoteBlobstoreUpdateTime > 0 && this.localUpdateTime == remoteBlobstoreUpdateTime) {
+            LOG.debug("{} is up to date, blob updatedModTime matches remote timestamp {}", this, remoteBlobstoreUpdateTime);
+            return false;
+        }
+
+        long localVersion = this.getLocalVersion();
+        long remoteVersion = this.getRemoteVersion(blobStore);
+        if (localVersion != remoteVersion) {
+            return true;
+        } else {
+            // track that we are now up to date with respect to last time the remote blobstore was updated
+            this.localUpdateTime = remoteBlobstoreUpdateTime;
+            return false;
+        }
+    }
+
+    /**
+     * Downloads a blob locally.
+     *
+     * @param blobStore the client blobstore
+     * @param remoteBlobstoreModTime last modification time of remote blobstore
+     *
+     * @throws KeyNotFoundException if the remote blob is missing
+     * @throws AuthorizationException if authorization is failed
+     * @throws IOException on errors
+     */
+    private void download(ClientBlobStore blobStore, long remoteBlobstoreModTime)

Review comment:
       nit: can be changed to `remoteBlobstoreUpdateTime`

##########
File path: storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
##########
@@ -271,29 +270,7 @@ private LocalizedResource getUserFile(String user, String key) {
                     long failures = 0;
                     while (!done) {
                         try {
-                            synchronized (blob) {
-                                if (blob.isUsed()) {
-                                    long localVersion = blob.getLocalVersion();
-                                    long remoteVersion = blob.getRemoteVersion(blobStore);
-                                    if (localVersion != remoteVersion || !blob.isFullyDownloaded()) {
-                                        if (blob.isFullyDownloaded()) {
-                                            //Avoid case of different blob version
-                                            // when blob is not downloaded (first time download)
-                                            numBlobUpdateVersionChanged.mark();
-                                        }
-                                        Timer.Context t = singleBlobLocalizationDuration.time();
-                                        try {
-                                            long newVersion = blob.fetchUnzipToTemp(blobStore);
-                                            blob.informReferencesAndCommitNewVersion(newVersion);
-                                            t.stop();
-                                        } finally {
-                                            blob.cleanupOrphanedData();
-                                        }
-                                    }
-                                } else {
-                                    LOG.debug("Skipping update of unused blob {}", blob);
-                                }
-                            }
+                            blob.update(blobStore, remoteBlobstoreModTime);

Review comment:
       nit: can be changed to `remoteBlobstoreUpdateTime`

##########
File path: storm-server/src/test/java/org/apache/storm/localizer/LocallyCachedBlobTest.java
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.localizer;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.IAdvancedFSOps;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class LocallyCachedBlobTest {
+    private static ClientBlobStore blobStore = Mockito.mock(ClientBlobStore.class);
+    private static PortAndAssignment pna = new PortAndAssignmentImpl(6077, new LocalAssignment());
+    private static Map<String, Object> conf = new HashMap<>();
+
+    @Test
+    public void testNotUsed() throws KeyNotFoundException, AuthorizationException {
+        LocallyCachedBlob blob = new LocalizedResource("key", Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new StormMetricsRegistry());
+        Assert.assertFalse(blob.isUsed());
+        Assert.assertFalse(blob.requiresUpdate(blobStore, -1L));
+    }
+
+    @Test
+    public void testNotDownloaded() throws KeyNotFoundException, AuthorizationException {
+        LocallyCachedBlob blob = new LocalizedResource("key", Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new StormMetricsRegistry());
+        blob.addReference(pna, null);
+        Assert.assertTrue(blob.isUsed());
+        Assert.assertFalse(blob.isFullyDownloaded());
+        Assert.assertTrue(blob.requiresUpdate(blobStore, -1L));
+    }
+
+    @Test
+    public void testOutOfDate() throws KeyNotFoundException, AuthorizationException {
+        TestableBlob blob = new TestableBlob("key", Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new StormMetricsRegistry());
+        blob.addReference(pna, null);
+        Assert.assertTrue(blob.isUsed());
+        Assert.assertTrue(blob.isFullyDownloaded());
+
+        // validate blob needs update due to version mismatch
+        Assert.assertTrue(blob.requiresUpdate(blobStore, -1L));
+
+        // when blob update time matches remote blobstore modtime, validate blob
+        // will skip looking at remote version and assume it's up to date
+        blob.localUpdateTime = 101L;
+        Assert.assertFalse(blob.requiresUpdate(blobStore, 101L));
+
+        // now when the mod time on the remote blobstore differs, we should again see that the

Review comment:
       nit: can change `mod time` to update time

##########
File path: storm-server/src/test/java/org/apache/storm/localizer/LocallyCachedBlobTest.java
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.localizer;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.IAdvancedFSOps;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class LocallyCachedBlobTest {
+    private static ClientBlobStore blobStore = Mockito.mock(ClientBlobStore.class);
+    private static PortAndAssignment pna = new PortAndAssignmentImpl(6077, new LocalAssignment());
+    private static Map<String, Object> conf = new HashMap<>();
+
+    @Test
+    public void testNotUsed() throws KeyNotFoundException, AuthorizationException {
+        LocallyCachedBlob blob = new LocalizedResource("key", Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new StormMetricsRegistry());
+        Assert.assertFalse(blob.isUsed());
+        Assert.assertFalse(blob.requiresUpdate(blobStore, -1L));
+    }
+
+    @Test
+    public void testNotDownloaded() throws KeyNotFoundException, AuthorizationException {
+        LocallyCachedBlob blob = new LocalizedResource("key", Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new StormMetricsRegistry());
+        blob.addReference(pna, null);
+        Assert.assertTrue(blob.isUsed());
+        Assert.assertFalse(blob.isFullyDownloaded());
+        Assert.assertTrue(blob.requiresUpdate(blobStore, -1L));
+    }
+
+    @Test
+    public void testOutOfDate() throws KeyNotFoundException, AuthorizationException {
+        TestableBlob blob = new TestableBlob("key", Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new StormMetricsRegistry());
+        blob.addReference(pna, null);
+        Assert.assertTrue(blob.isUsed());
+        Assert.assertTrue(blob.isFullyDownloaded());
+
+        // validate blob needs update due to version mismatch
+        Assert.assertTrue(blob.requiresUpdate(blobStore, -1L));
+
+        // when blob update time matches remote blobstore modtime, validate blob

Review comment:
       nit: can change `modtime` to update time

##########
File path: storm-server/src/test/java/org/apache/storm/localizer/LocallyCachedBlobTest.java
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.localizer;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.IAdvancedFSOps;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class LocallyCachedBlobTest {
+    private static ClientBlobStore blobStore = Mockito.mock(ClientBlobStore.class);
+    private static PortAndAssignment pna = new PortAndAssignmentImpl(6077, new LocalAssignment());
+    private static Map<String, Object> conf = new HashMap<>();
+
+    @Test
+    public void testNotUsed() throws KeyNotFoundException, AuthorizationException {
+        LocallyCachedBlob blob = new LocalizedResource("key", Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new StormMetricsRegistry());
+        Assert.assertFalse(blob.isUsed());
+        Assert.assertFalse(blob.requiresUpdate(blobStore, -1L));
+    }
+
+    @Test
+    public void testNotDownloaded() throws KeyNotFoundException, AuthorizationException {
+        LocallyCachedBlob blob = new LocalizedResource("key", Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new StormMetricsRegistry());
+        blob.addReference(pna, null);
+        Assert.assertTrue(blob.isUsed());
+        Assert.assertFalse(blob.isFullyDownloaded());
+        Assert.assertTrue(blob.requiresUpdate(blobStore, -1L));
+    }
+
+    @Test
+    public void testOutOfDate() throws KeyNotFoundException, AuthorizationException {
+        TestableBlob blob = new TestableBlob("key", Paths.get("/bogus"), false,
+                AdvancedFSOps.make(conf), conf, "user1", new StormMetricsRegistry());
+        blob.addReference(pna, null);
+        Assert.assertTrue(blob.isUsed());
+        Assert.assertTrue(blob.isFullyDownloaded());
+
+        // validate blob needs update due to version mismatch
+        Assert.assertTrue(blob.requiresUpdate(blobStore, -1L));
+
+        // when blob update time matches remote blobstore modtime, validate blob
+        // will skip looking at remote version and assume it's up to date
+        blob.localUpdateTime = 101L;
+        Assert.assertFalse(blob.requiresUpdate(blobStore, 101L));
+
+        // now when the mod time on the remote blobstore differs, we should again see that the
+        // blob version differs from the remote blobstore
+        Assert.assertTrue(blob.requiresUpdate(blobStore, 102L));
+
+        // now validate we don't need any update as versions match, regardless of remote blobstore mod time

Review comment:
       nit: can change mod time to update time

##########
File path: storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
##########
@@ -295,6 +301,90 @@ public String getKey() {
 
     public abstract boolean isFullyDownloaded();
 
+    /**
+     * Checks to see if the local blob requires update with respect to a remote blob.
+     *
+     * @param blobStore the client blobstore
+     * @param remoteBlobstoreUpdateTime last update time of remote blobstore
+     * @return true of the local blob requires update, false otherwise.
+     *
+     * @throws KeyNotFoundException if the remote blob is missing
+     * @throws AuthorizationException if authorization is failed
+     */
+    boolean requiresUpdate(ClientBlobStore blobStore, long remoteBlobstoreUpdateTime) throws KeyNotFoundException, AuthorizationException {
+        if (!this.isUsed()) {
+            return false;
+        }
+
+        if (!this.isFullyDownloaded()) {
+            return true;
+        }
+
+        // If we are already up to date with respect to the remote blob store, don't query
+        // the remote blobstore for the remote file.  This reduces Hadoop namenode impact of
+        // 100's of supervisors querying multiple blobs.
+        if (remoteBlobstoreUpdateTime > 0 && this.localUpdateTime == remoteBlobstoreUpdateTime) {
+            LOG.debug("{} is up to date, blob updatedModTime matches remote timestamp {}", this, remoteBlobstoreUpdateTime);
+            return false;
+        }
+
+        long localVersion = this.getLocalVersion();
+        long remoteVersion = this.getRemoteVersion(blobStore);
+        if (localVersion != remoteVersion) {
+            return true;
+        } else {
+            // track that we are now up to date with respect to last time the remote blobstore was updated
+            this.localUpdateTime = remoteBlobstoreUpdateTime;
+            return false;
+        }
+    }
+
+    /**
+     * Downloads a blob locally.
+     *
+     * @param blobStore the client blobstore
+     * @param remoteBlobstoreModTime last modification time of remote blobstore
+     *
+     * @throws KeyNotFoundException if the remote blob is missing
+     * @throws AuthorizationException if authorization is failed
+     * @throws IOException on errors
+     */
+    private void download(ClientBlobStore blobStore, long remoteBlobstoreModTime)
+            throws AuthorizationException, IOException, KeyNotFoundException {
+        if (this.isFullyDownloaded()) {
+            numBlobUpdateVersionChanged.mark();
+        }
+        Timer.Context timer = singleBlobLocalizationDuration.time();
+        try {
+            long newVersion = this.fetchUnzipToTemp(blobStore);
+            this.informReferencesAndCommitNewVersion(newVersion);
+            this.localUpdateTime = remoteBlobstoreModTime;
+            LOG.debug("local blob {} downloaded, in sync with remote blobstore to time {}", this, remoteBlobstoreModTime);
+        } finally {
+            timer.stop();
+            this.cleanupOrphanedData();
+        }
+    }
+
+    /**
+     * Checks and downloads a blob locally as necessary.
+     *
+     * @param blobStore the client blobstore
+     * @param remoteBlobstoreModTime last modification time of remote blobstore
+     *
+     * @throws KeyNotFoundException if the remote blob is missing
+     * @throws AuthorizationException if authorization is failed
+     * @throws IOException on errors
+     */
+    public void update(ClientBlobStore blobStore, long remoteBlobstoreModTime)

Review comment:
       nit: can be changed to `remoteBlobstoreUpdateTime`

##########
File path: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
##########
@@ -3815,6 +3828,7 @@ public void setBlobMeta(String key, SettableBlobMeta meta)
         throws AuthorizationException, KeyNotFoundException, TException {
         try {
             blobStore.setBlobMeta(key, meta, getSubject());
+            blobStore.updateLastBlobUpdateTime();

Review comment:
       Do we need this on `updateBlobReplication` method too?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r555271721



##########
File path: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
##########
@@ -3815,6 +3828,7 @@ public void setBlobMeta(String key, SettableBlobMeta meta)
         throws AuthorizationException, KeyNotFoundException, TException {
         try {
             blobStore.setBlobMeta(key, meta, getSubject());
+            blobStore.updateLastModTime();

Review comment:
       What will happen if `blobStore.setBlobMeta` succeeds then `blobStore.updateLastModTime()` fails? Or nimbus crashed before `blobStore.updateLastModTime()` can run?
    
   Looks like supervisors will not be notified for this blob update. We should update the timestamp before doing the actual operation.
   
   Also, do we need to update the timestamp in `updateBlobReplication(String key, int replication)`?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r555136615



##########
File path: external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
##########
@@ -404,4 +404,18 @@ public void writeMetadata(String key, SettableBlobMeta meta)
     public void fullCleanup(long age) throws IOException {
         hbs.fullCleanup(age);
     }
+
+    public long getLastModeTime() throws IOException {

Review comment:
       Nit: This should be `getLastModTime`

##########
File path: external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +313,54 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last modification time of any blob.
+     *
+     * @return the last modification time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastModTime() throws IOException {
+        long modtime =  fileSystem.getFileStatus(fullPath).getModificationTime();
+        return modtime;
+    }
+
+    /**
+     * Updates the modification time of the blobstore to the current time.
+     *
+     * @throws IOException on any error
+     */
+    public void updateLastModTime() throws IOException {
+        long timestamp = Time.currentTimeMillis();
+        fileSystem.setTimes(fullPath, timestamp, timestamp);
+        LOG.debug("Updated blobstore modtime of {} to {}", fullPath, timestamp);
+    }
+
+    /**
+     * Validates that the modification time of the blobstore is up to date with the current existing blobs.
+     *
+     * @throws IOException on any error
+     */
+    public void validateModTime() throws IOException {
+        int currentBucket = 0;
+        long baseModTime = 0;
+        while (currentBucket < BUCKETS) {
+            String name = String.valueOf(currentBucket);
+            Path bucketDir = new Path(fullPath, name);
+
+            // only consider bucket dirs that exist with files in them
+            if (fileSystem.exists(bucketDir) && fileSystem.listStatus(bucketDir).length > 0) {
+                long modtime = fileSystem.getFileStatus(bucketDir).getModificationTime();

Review comment:
       Do you mean the modification timestamp of `bucketDir`? 
   Its modification time will not change if the content of a blob in the directory changes.  If a blob is added or deleted in the directory, the modification time of the directory (`bucketDir`) will change. But here we ignore any directory that has none blobs, which means the following case is not captured: "all blobs in the directory are deleted"
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
agresch commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r550682688



##########
File path: external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +313,54 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last modification time of any blob.
+     *
+     * @return the last modification time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastModTime() throws IOException {
+        long modtime =  fileSystem.getFileStatus(fullPath).getModificationTime();
+        return modtime;
+    }
+
+    /**
+     * Updates the modification time of the blobstore to the current time.
+     *
+     * @throws IOException on any error
+     */
+    public void updateLastModTime() throws IOException {
+        long timestamp = Time.currentTimeMillis();
+        fileSystem.setTimes(fullPath, timestamp, timestamp);
+        LOG.debug("Updated blobstore modtime of {} to {}", fullPath, timestamp);
+    }
+
+    /**
+     * Validates that the modification time of the blobstore is up to date with the current existing blobs.
+     *
+     * @throws IOException on any error
+     */
+    public void validateModTime() throws IOException {
+        int currentBucket = 0;
+        long baseModTime = 0;
+        while (currentBucket < BUCKETS) {
+            String name = String.valueOf(currentBucket);
+            Path bucketDir = new Path(fullPath, name);
+
+            // only consider bucket dirs that exist with files in them
+            if (fileSystem.exists(bucketDir) && fileSystem.listStatus(bucketDir).length > 0) {
+                long modtime = fileSystem.getFileStatus(bucketDir).getModificationTime();

Review comment:
       The modification timestamp of the directory updates when a blob in the directory changes or is deleted.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
agresch commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r546884368



##########
File path: storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
##########
@@ -295,6 +301,90 @@ public String getKey() {
 
     public abstract boolean isFullyDownloaded();
 
+    /**
+     * Checks to see if the local blob requires update with respect to a remote blob.
+     *
+     * @param blobStore the client blobstore
+     * @param remoteBlobstoreModTime last modification time of remote blobstore
+     * @return true of the local blob requires update, false otherwise.
+     *
+     * @throws KeyNotFoundException if the remote blob is missing
+     * @throws AuthorizationException if authorization is failed
+     */
+    boolean requiresUpdate(ClientBlobStore blobStore, long remoteBlobstoreModTime) throws KeyNotFoundException, AuthorizationException {
+        if (!this.isUsed()) {
+            return false;
+        }
+
+        if (!this.isFullyDownloaded()) {
+            return true;
+        }
+
+        // If we are already up to date with respect to the remote blob store, don't query
+        // the remote blobstore for the remote file.  This reduces Hadoop namenode impact of
+        // 100's of supervisors querying multiple blobs.
+        if (remoteBlobstoreModTime > 0 && this.updatedModTime == remoteBlobstoreModTime) {
+            LOG.debug("{} is up to date, blob updatedModTime matches remote timestamp {}", this, remoteBlobstoreModTime);
+            return false;
+        }
+

Review comment:
       Line 326 is an early out that will only occur now for HDFS blobstores.  Unsupported blobstores with -1 remoteBlobstoreModTime should act the same as previous to this change, where they always check the local version against the remote version of the blob.
   
   Line 337 should work regardless of what value is passed for remoteBlobstoreModTime.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
agresch commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r555287880



##########
File path: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
##########
@@ -3815,6 +3828,7 @@ public void setBlobMeta(String key, SettableBlobMeta meta)
         throws AuthorizationException, KeyNotFoundException, TException {
         try {
             blobStore.setBlobMeta(key, meta, getSubject());
+            blobStore.updateLastModTime();

Review comment:
       yes, validateModTime() should update the mod time if a file in the bucket dir is updated.  There will also be occasional false positive updates if a blob is deleted in a bucket dir that contains a different blob file. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r546855901



##########
File path: storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
##########
@@ -295,6 +301,90 @@ public String getKey() {
 
     public abstract boolean isFullyDownloaded();
 
+    /**
+     * Checks to see if the local blob requires update with respect to a remote blob.
+     *
+     * @param blobStore the client blobstore
+     * @param remoteBlobstoreModTime last modification time of remote blobstore
+     * @return true of the local blob requires update, false otherwise.
+     *
+     * @throws KeyNotFoundException if the remote blob is missing
+     * @throws AuthorizationException if authorization is failed
+     */
+    boolean requiresUpdate(ClientBlobStore blobStore, long remoteBlobstoreModTime) throws KeyNotFoundException, AuthorizationException {
+        if (!this.isUsed()) {
+            return false;
+        }
+
+        if (!this.isFullyDownloaded()) {
+            return true;
+        }
+
+        // If we are already up to date with respect to the remote blob store, don't query
+        // the remote blobstore for the remote file.  This reduces Hadoop namenode impact of
+        // 100's of supervisors querying multiple blobs.
+        if (remoteBlobstoreModTime > 0 && this.updatedModTime == remoteBlobstoreModTime) {
+            LOG.debug("{} is up to date, blob updatedModTime matches remote timestamp {}", this, remoteBlobstoreModTime);
+            return false;
+        }
+

Review comment:
       For unsupported, the remoteBlobstoreModTime will be returned as -1. Does that need a special case processing here or line 337 ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
agresch commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r555278517



##########
File path: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
##########
@@ -3815,6 +3828,7 @@ public void setBlobMeta(String key, SettableBlobMeta meta)
         throws AuthorizationException, KeyNotFoundException, TException {
         try {
             blobStore.setBlobMeta(key, meta, getSubject());
+            blobStore.updateLastModTime();

Review comment:
       If the updateLastModTime fails, eventually a Nimbus should get restarted or HDFS fixed, and Nimbus will call blobStore.validateModTime().  This should fix the issue.  During the period in-between, supervisors will remain on an older version of the blob.  
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r549128188



##########
File path: external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +313,54 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last modification time of any blob.
+     *
+     * @return the last modification time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastModTime() throws IOException {
+        long modtime =  fileSystem.getFileStatus(fullPath).getModificationTime();
+        return modtime;
+    }
+
+    /**
+     * Updates the modification time of the blobstore to the current time.
+     *
+     * @throws IOException on any error
+     */
+    public void updateLastModTime() throws IOException {
+        long timestamp = Time.currentTimeMillis();
+        fileSystem.setTimes(fullPath, timestamp, timestamp);
+        LOG.debug("Updated blobstore modtime of {} to {}", fullPath, timestamp);
+    }
+
+    /**
+     * Validates that the modification time of the blobstore is up to date with the current existing blobs.
+     *
+     * @throws IOException on any error
+     */
+    public void validateModTime() throws IOException {
+        int currentBucket = 0;
+        long baseModTime = 0;
+        while (currentBucket < BUCKETS) {
+            String name = String.valueOf(currentBucket);
+            Path bucketDir = new Path(fullPath, name);
+
+            // only consider bucket dirs that exist with files in them
+            if (fileSystem.exists(bucketDir) && fileSystem.listStatus(bucketDir).length > 0) {
+                long modtime = fileSystem.getFileStatus(bucketDir).getModificationTime();

Review comment:
       This looks at the mod time of the bucketDir.  But the blob files can be updated. 
   https://git.vzbuilders.com/storm/storm/blob/master/docs/distcache-blobstore.md#updating-a-cached-file
   
   Will this work properly in this case? Or does the code change at Nimbus.java serve the purpose?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r555285564



##########
File path: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
##########
@@ -3815,6 +3828,7 @@ public void setBlobMeta(String key, SettableBlobMeta meta)
         throws AuthorizationException, KeyNotFoundException, TException {
         try {
             blobStore.setBlobMeta(key, meta, getSubject());
+            blobStore.updateLastModTime();

Review comment:
       Thanks. My biggest question is: will the bucketDir's modTime update when the content of a file under the `bucketDir` updates? 
   I think your answer is yes.
   
   If that's so, I agree that `validateModTime` will serve the purpose.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r555279002



##########
File path: external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +321,70 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last modification time of any blob.
+     *
+     * @return the last modification time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastModTime() throws IOException {
+        Path modTimeFile = new Path(fullPath, BLOBSTORE_MOD_TIME_FILE);
+        if (!fileSystem.exists(modTimeFile)) {
+            return -1L;
+        }
+        FSDataInputStream inputStream = fileSystem.open(modTimeFile);
+        String timestamp = IOUtils.toString(inputStream, "UTF-8");
+        inputStream.close();
+        try {
+            long modTime = Long.parseLong(timestamp);
+            return modTime;
+        } catch (NumberFormatException e) {
+            LOG.error("Invalid blobstore modtime {} in file {}", timestamp, modTimeFile);
+            return -1L;
+        }
+    }
+
+    /**
+     * Updates the modification time of the blobstore to the current time.
+     *
+     * @throws IOException on any error
+     */
+    public void updateLastModTime() throws IOException {

Review comment:
       What happens if this method is invoked in two threads in the same time? Do we need synchronization here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
agresch commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r553613826



##########
File path: external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +313,54 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last modification time of any blob.
+     *
+     * @return the last modification time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastModTime() throws IOException {
+        long modtime =  fileSystem.getFileStatus(fullPath).getModificationTime();
+        return modtime;
+    }
+
+    /**
+     * Updates the modification time of the blobstore to the current time.
+     *
+     * @throws IOException on any error
+     */
+    public void updateLastModTime() throws IOException {
+        long timestamp = Time.currentTimeMillis();
+        fileSystem.setTimes(fullPath, timestamp, timestamp);

Review comment:
       Updated with this change and addressed your other comments.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r552166555



##########
File path: external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +313,54 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last modification time of any blob.
+     *
+     * @return the last modification time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastModTime() throws IOException {
+        long modtime =  fileSystem.getFileStatus(fullPath).getModificationTime();
+        return modtime;
+    }
+
+    /**
+     * Updates the modification time of the blobstore to the current time.
+     *
+     * @throws IOException on any error
+     */
+    public void updateLastModTime() throws IOException {
+        long timestamp = Time.currentTimeMillis();
+        fileSystem.setTimes(fullPath, timestamp, timestamp);

Review comment:
       I would personally prefer to use a separate file to record the metadata instead of modifying the mtime of a directory since it doesn't follow the semantics of mtime. It doesn't seem to be a problem currently but we don't know what we will do in the future so I'd rather not break the semantics and also this is to avoid confusion. It is a personal preference though. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
agresch commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r550682810



##########
File path: external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +313,54 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last modification time of any blob.
+     *
+     * @return the last modification time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastModTime() throws IOException {
+        long modtime =  fileSystem.getFileStatus(fullPath).getModificationTime();
+        return modtime;
+    }
+
+    /**
+     * Updates the modification time of the blobstore to the current time.
+     *
+     * @throws IOException on any error
+     */
+    public void updateLastModTime() throws IOException {
+        long timestamp = Time.currentTimeMillis();
+        fileSystem.setTimes(fullPath, timestamp, timestamp);

Review comment:
       I am not clear what the concern is.  Would you prefer I touch a file instead of directory?  Other suggestions?  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch merged pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
agresch merged pull request #3363:
URL: https://github.com/apache/storm/pull/3363


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
agresch commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r555288103



##########
File path: external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +321,70 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last modification time of any blob.
+     *
+     * @return the last modification time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastModTime() throws IOException {
+        Path modTimeFile = new Path(fullPath, BLOBSTORE_MOD_TIME_FILE);
+        if (!fileSystem.exists(modTimeFile)) {
+            return -1L;
+        }
+        FSDataInputStream inputStream = fileSystem.open(modTimeFile);
+        String timestamp = IOUtils.toString(inputStream, "UTF-8");
+        inputStream.close();
+        try {
+            long modTime = Long.parseLong(timestamp);
+            return modTime;
+        } catch (NumberFormatException e) {
+            LOG.error("Invalid blobstore modtime {} in file {}", timestamp, modTimeFile);
+            return -1L;
+        }
+    }
+
+    /**
+     * Updates the modification time of the blobstore to the current time.
+     *
+     * @throws IOException on any error
+     */
+    public void updateLastModTime() throws IOException {

Review comment:
       validateModTime() would eventually fix things, but I will add synchronization.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
agresch commented on pull request #3363:
URL: https://github.com/apache/storm/pull/3363#issuecomment-758162473


   The intent of the mod time is to let supervisors know the last time a blob was uploaded or modified.  If no blob has been modified, the supervisor has no need to look at updating blobs.  Deleting blobs should not matter for this use case.  Possibly I could rename the API to getLastBlobUpdateTime()?  Would that be preferable?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on pull request #3363:
URL: https://github.com/apache/storm/pull/3363#issuecomment-758152578


   I thought modification meant any changes to the blobs including add/update/delete. But if it doesn't, I still see two different behaviors from the current code regarding deletion:
   
   If the `bucketDir` is empty after deletion, this deletion is ignored and supervisors are not notified;
   if the `bucketDir` is not empty after deletion, this deletion will update the remoteLastModTime and supervisors will be notified. 
   
   So modification here sometimes include "deletion", sometimes doesn't. I would love it to be more consistent. 
   
   Please correct me if I misunderstand anything. Thanks.
   
   Also, what will happen if a blob is deleted by `blobstore delete` command? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r555271721



##########
File path: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
##########
@@ -3815,6 +3828,7 @@ public void setBlobMeta(String key, SettableBlobMeta meta)
         throws AuthorizationException, KeyNotFoundException, TException {
         try {
             blobStore.setBlobMeta(key, meta, getSubject());
+            blobStore.updateLastModTime();

Review comment:
       What will happen if `blobStore.setBlobMeta` succeeds then `blobStore.updateLastModTime()` fails? Or nimbus crashed before `blobStore.updateLastModTime()` can run?
    
   Looks like supervisors will not be notified for this blob update. We should update the timestamp before doing the actual operation.
   
   Also, do we need similar code in `updateBlobReplication(String key, int replication)`?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
agresch commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r552170314



##########
File path: external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +313,54 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last modification time of any blob.
+     *
+     * @return the last modification time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastModTime() throws IOException {
+        long modtime =  fileSystem.getFileStatus(fullPath).getModificationTime();
+        return modtime;
+    }
+
+    /**
+     * Updates the modification time of the blobstore to the current time.
+     *
+     * @throws IOException on any error
+     */
+    public void updateLastModTime() throws IOException {
+        long timestamp = Time.currentTimeMillis();
+        fileSystem.setTimes(fullPath, timestamp, timestamp);

Review comment:
       will update the PR with your suggested change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
agresch commented on pull request #3363:
URL: https://github.com/apache/storm/pull/3363#issuecomment-758041035


   Why would we need to copy a remote file if there are no files to copy?  I'm not following.  We should only need to update a local blob if files have been added or updated.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r546855901



##########
File path: storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
##########
@@ -295,6 +301,90 @@ public String getKey() {
 
     public abstract boolean isFullyDownloaded();
 
+    /**
+     * Checks to see if the local blob requires update with respect to a remote blob.
+     *
+     * @param blobStore the client blobstore
+     * @param remoteBlobstoreModTime last modification time of remote blobstore
+     * @return true of the local blob requires update, false otherwise.
+     *
+     * @throws KeyNotFoundException if the remote blob is missing
+     * @throws AuthorizationException if authorization is failed
+     */
+    boolean requiresUpdate(ClientBlobStore blobStore, long remoteBlobstoreModTime) throws KeyNotFoundException, AuthorizationException {
+        if (!this.isUsed()) {
+            return false;
+        }
+
+        if (!this.isFullyDownloaded()) {
+            return true;
+        }
+
+        // If we are already up to date with respect to the remote blob store, don't query
+        // the remote blobstore for the remote file.  This reduces Hadoop namenode impact of
+        // 100's of supervisors querying multiple blobs.
+        if (remoteBlobstoreModTime > 0 && this.updatedModTime == remoteBlobstoreModTime) {
+            LOG.debug("{} is up to date, blob updatedModTime matches remote timestamp {}", this, remoteBlobstoreModTime);
+            return false;
+        }
+

Review comment:
       For unsupported, the remoteBlobstoreModTime will be returned as -1. Does that need a special case processing here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
agresch commented on pull request #3363:
URL: https://github.com/apache/storm/pull/3363#issuecomment-758742659


   It looks like updateBlobReplication does a write to update metadata, so I will add an update for that as well.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
agresch commented on pull request #3363:
URL: https://github.com/apache/storm/pull/3363#issuecomment-759653709


   @Ethanlm - I've made the changes you suggested.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r555271721



##########
File path: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
##########
@@ -3815,6 +3828,7 @@ public void setBlobMeta(String key, SettableBlobMeta meta)
         throws AuthorizationException, KeyNotFoundException, TException {
         try {
             blobStore.setBlobMeta(key, meta, getSubject());
+            blobStore.updateLastModTime();

Review comment:
       What will happen if `blobStore.setBlobMeta` succeeds then `blobStore.updateLastModTime()` fails? Or nimbus crashed before `blobStore.updateLastModTime()` can run?
    
   Looks like supervisors will not be notified for this blob update. We should update the timestamp before doing the actual operation.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3363: STORM-3724 use blobstore modtime to prevent querying each remote file on update

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3363:
URL: https://github.com/apache/storm/pull/3363#discussion_r549123934



##########
File path: storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java
##########
@@ -158,4 +158,9 @@ public LocalState supervisorStateImpl(Map<String, Object> conf) throws IOExcepti
     public LocalState nimbusTopoHistoryStateImpl(Map<String, Object> conf) throws IOException {
         return new LocalState((masterLocalDir(conf) + FILE_SEPARATOR + "history"), true);
     }
+
+    public static long getLocalizerUpdateBlobInterval(Map<String, Object> conf) {
+        return ObjectReader.getInt(conf.get(

Review comment:
       This looks a little weird on how it switches between integer and long. It can be just integer.
   https://github.com/apache/storm/blob/6e3707242e0e6835085d358bec0abee4f62b1426/storm-server/src/main/java/org/apache/storm/DaemonConfig.java#L769-L770
   

##########
File path: external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +313,54 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last modification time of any blob.
+     *
+     * @return the last modification time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastModTime() throws IOException {
+        long modtime =  fileSystem.getFileStatus(fullPath).getModificationTime();

Review comment:
       nit: space

##########
File path: external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +313,54 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last modification time of any blob.
+     *
+     * @return the last modification time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastModTime() throws IOException {
+        long modtime =  fileSystem.getFileStatus(fullPath).getModificationTime();
+        return modtime;
+    }
+
+    /**
+     * Updates the modification time of the blobstore to the current time.
+     *
+     * @throws IOException on any error
+     */
+    public void updateLastModTime() throws IOException {
+        long timestamp = Time.currentTimeMillis();
+        fileSystem.setTimes(fullPath, timestamp, timestamp);
+        LOG.debug("Updated blobstore modtime of {} to {}", fullPath, timestamp);
+    }
+
+    /**
+     * Validates that the modification time of the blobstore is up to date with the current existing blobs.
+     *
+     * @throws IOException on any error
+     */
+    public void validateModTime() throws IOException {
+        int currentBucket = 0;
+        long baseModTime = 0;
+        while (currentBucket < BUCKETS) {
+            String name = String.valueOf(currentBucket);
+            Path bucketDir = new Path(fullPath, name);
+
+            // only consider bucket dirs that exist with files in them
+            if (fileSystem.exists(bucketDir) && fileSystem.listStatus(bucketDir).length > 0) {
+                long modtime = fileSystem.getFileStatus(bucketDir).getModificationTime();

Review comment:
       This looks at the mod time of the bucketDir.  But the blob files can be updated. 
   https://git.vzbuilders.com/storm/storm/blob/master/docs/distcache-blobstore.md#updating-a-cached-file
   
   Will this work properly in this case? Or is the code change at Nimbus.java serve the purpose?
   

##########
File path: external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImpl.java
##########
@@ -312,4 +313,54 @@ public void shutdown() {
             timer.cancel();
         }
     }
+
+    /**
+     * Get the last modification time of any blob.
+     *
+     * @return the last modification time of blobs within the blobstore.
+     * @throws IOException on any error
+     */
+    public long getLastModTime() throws IOException {
+        long modtime =  fileSystem.getFileStatus(fullPath).getModificationTime();
+        return modtime;
+    }
+
+    /**
+     * Updates the modification time of the blobstore to the current time.
+     *
+     * @throws IOException on any error
+     */
+    public void updateLastModTime() throws IOException {
+        long timestamp = Time.currentTimeMillis();
+        fileSystem.setTimes(fullPath, timestamp, timestamp);

Review comment:
       I am not very sure if changing the mtime of the blobstore directory is a good idea. It breaks the semantics of modification time of a directory.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org