You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by GitBox <gi...@apache.org> on 2021/01/29 18:20:51 UTC

[GitHub] [storm] agresch opened a new pull request #3374: STORM-3740 cancel downloading blobs when releasing worker slot

agresch opened a new pull request #3374:
URL: https://github.com/apache/storm/pull/3374


   ## What is the purpose of the change
   
   When a worker slot is released, there could still be downloading blobs running in background threads.  These can add references to a blob after the worker is killed, causing the AsyncLocalizer to look for updates on the blob infinitely.
   
   ## How was the change tested
   
   Ran storm-server unit tests and various internal blob/scheduling integration tests.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3374: STORM-3740 cancel downloading blobs when releasing worker slot

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3374:
URL: https://github.com/apache/storm/pull/3374#discussion_r568281273



##########
File path: storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
##########
@@ -440,9 +442,23 @@ public void recoverRunningTopology(final LocalAssignment currentAssignment, fina
      *
      * @param assignment the assignment the resources are for
      * @param port       the port the topology is running on
+     * @param downloadingBlobs any existing downloading blob futures
      * @throws IOException on any error
      */
-    public void releaseSlotFor(LocalAssignment assignment, int port) throws IOException {
+    public void releaseSlotFor(LocalAssignment assignment, int port,
+                               Collection<Future<Void>> downloadingBlobs) throws IOException {
+
+        // Make sure any downloading blobs in the background are stopped.
+        // This prevents a race condition where we could be adding references to a
+        // delayed downloading blob after the slot gets released, causing orphaned blobs.
+        for (Future future : downloadingBlobs) {
+            if (!future.isDone()) {
+                LOG.info("Canceling download of {}", future);
+                future.cancel(true);
+            }
+        }
+        downloadingBlobs.clear();

Review comment:
       Can this be a instance method in DynamicState?
   This instance method will also need to be called from drainAllChangingBlobs() just before a new DynamicState is returned.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch merged pull request #3374: STORM-3740 cancel downloading blobs when releasing worker slot

Posted by GitBox <gi...@apache.org>.
agresch merged pull request #3374:
URL: https://github.com/apache/storm/pull/3374


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on a change in pull request #3374: STORM-3740 cancel downloading blobs when releasing worker slot

Posted by GitBox <gi...@apache.org>.
agresch commented on a change in pull request #3374:
URL: https://github.com/apache/storm/pull/3374#discussion_r568705392



##########
File path: storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
##########
@@ -440,9 +442,23 @@ public void recoverRunningTopology(final LocalAssignment currentAssignment, fina
      *
      * @param assignment the assignment the resources are for
      * @param port       the port the topology is running on
+     * @param downloadingBlobs any existing downloading blob futures
      * @throws IOException on any error
      */
-    public void releaseSlotFor(LocalAssignment assignment, int port) throws IOException {
+    public void releaseSlotFor(LocalAssignment assignment, int port,
+                               Collection<Future<Void>> downloadingBlobs) throws IOException {
+
+        // Make sure any downloading blobs in the background are stopped.
+        // This prevents a race condition where we could be adding references to a
+        // delayed downloading blob after the slot gets released, causing orphaned blobs.
+        for (Future future : downloadingBlobs) {
+            if (!future.isDone()) {
+                LOG.info("Canceling download of {}", future);
+                future.cancel(true);
+            }
+        }
+        downloadingBlobs.clear();

Review comment:
       I agree drainAllChangingBlobs() should also clean this up.  I will look into this.  Good find.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3374: STORM-3740 cancel downloading blobs when releasing worker slot

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3374:
URL: https://github.com/apache/storm/pull/3374#discussion_r570438668



##########
File path: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
##########
@@ -1288,6 +1296,18 @@ public DynamicState withPendingChangingBlobs(Set<Future<Void>> pendingChangingBl
                                     pendingChangingBlobs,
                                     pendingChangingBlobsAssignment, this.slotMetrics);
         }
+
+        private void cancelPendingBlobs() {
+            // Make sure any downloading blobs in the background are stopped.
+            // This prevents a race condition where we could be adding references to a
+            // delayed downloading blob after the slot gets released, causing orphaned blobs.
+            for (Future future : pendingChangingBlobs) {
+                if (!future.isDone()) {
+                    LOG.info("Canceling download of {}", future);
+                    future.cancel(true);
+                }
+            }
+        }

Review comment:
       Does this pendingChangingBlobs list need to be cleared before exiting method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch closed pull request #3374: STORM-3740 cancel downloading blobs when releasing worker slot

Posted by GitBox <gi...@apache.org>.
agresch closed pull request #3374:
URL: https://github.com/apache/storm/pull/3374


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on a change in pull request #3374: STORM-3740 cancel downloading blobs when releasing worker slot

Posted by GitBox <gi...@apache.org>.
agresch commented on a change in pull request #3374:
URL: https://github.com/apache/storm/pull/3374#discussion_r568705392



##########
File path: storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
##########
@@ -440,9 +442,23 @@ public void recoverRunningTopology(final LocalAssignment currentAssignment, fina
      *
      * @param assignment the assignment the resources are for
      * @param port       the port the topology is running on
+     * @param downloadingBlobs any existing downloading blob futures
      * @throws IOException on any error
      */
-    public void releaseSlotFor(LocalAssignment assignment, int port) throws IOException {
+    public void releaseSlotFor(LocalAssignment assignment, int port,
+                               Collection<Future<Void>> downloadingBlobs) throws IOException {
+
+        // Make sure any downloading blobs in the background are stopped.
+        // This prevents a race condition where we could be adding references to a
+        // delayed downloading blob after the slot gets released, causing orphaned blobs.
+        for (Future future : downloadingBlobs) {
+            if (!future.isDone()) {
+                LOG.info("Canceling download of {}", future);
+                future.cancel(true);
+            }
+        }
+        downloadingBlobs.clear();

Review comment:
       I agree drainAllChangingBlobs() should also clean this up.  I will look into this.  Good find.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on a change in pull request #3374: STORM-3740 cancel downloading blobs when releasing worker slot

Posted by GitBox <gi...@apache.org>.
agresch commented on a change in pull request #3374:
URL: https://github.com/apache/storm/pull/3374#discussion_r570497395



##########
File path: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
##########
@@ -1288,6 +1296,18 @@ public DynamicState withPendingChangingBlobs(Set<Future<Void>> pendingChangingBl
                                     pendingChangingBlobs,
                                     pendingChangingBlobsAssignment, this.slotMetrics);
         }
+
+        private void cancelPendingBlobs() {
+            // Make sure any downloading blobs in the background are stopped.
+            // This prevents a race condition where we could be adding references to a
+            // delayed downloading blob after the slot gets released, causing orphaned blobs.
+            for (Future future : pendingChangingBlobs) {
+                if (!future.isDone()) {
+                    LOG.info("Canceling download of {}", future);
+                    future.cancel(true);
+                }
+            }
+        }

Review comment:
       This causes various asserts in the integration tests.  I tried creating new DynamicStates without any futures, and that caused other problems.  This is beyond my understanding, but having the futures canceled should prevent the race conditions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] agresch commented on a change in pull request #3374: STORM-3740 cancel downloading blobs when releasing worker slot

Posted by GitBox <gi...@apache.org>.
agresch commented on a change in pull request #3374:
URL: https://github.com/apache/storm/pull/3374#discussion_r570497395



##########
File path: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
##########
@@ -1288,6 +1296,18 @@ public DynamicState withPendingChangingBlobs(Set<Future<Void>> pendingChangingBl
                                     pendingChangingBlobs,
                                     pendingChangingBlobsAssignment, this.slotMetrics);
         }
+
+        private void cancelPendingBlobs() {
+            // Make sure any downloading blobs in the background are stopped.
+            // This prevents a race condition where we could be adding references to a
+            // delayed downloading blob after the slot gets released, causing orphaned blobs.
+            for (Future future : pendingChangingBlobs) {
+                if (!future.isDone()) {
+                    LOG.info("Canceling download of {}", future);
+                    future.cancel(true);
+                }
+            }
+        }

Review comment:
       This causes various asserts in the integration tests.  I tried creating new DynamicStates without any futures, and that caused other problems.  This is beyond my understanding, but having the futures canceled should prevent the race conditions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3374: STORM-3740 cancel downloading blobs when releasing worker slot

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3374:
URL: https://github.com/apache/storm/pull/3374#discussion_r568281273



##########
File path: storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
##########
@@ -440,9 +442,23 @@ public void recoverRunningTopology(final LocalAssignment currentAssignment, fina
      *
      * @param assignment the assignment the resources are for
      * @param port       the port the topology is running on
+     * @param downloadingBlobs any existing downloading blob futures
      * @throws IOException on any error
      */
-    public void releaseSlotFor(LocalAssignment assignment, int port) throws IOException {
+    public void releaseSlotFor(LocalAssignment assignment, int port,
+                               Collection<Future<Void>> downloadingBlobs) throws IOException {
+
+        // Make sure any downloading blobs in the background are stopped.
+        // This prevents a race condition where we could be adding references to a
+        // delayed downloading blob after the slot gets released, causing orphaned blobs.
+        for (Future future : downloadingBlobs) {
+            if (!future.isDone()) {
+                LOG.info("Canceling download of {}", future);
+                future.cancel(true);
+            }
+        }
+        downloadingBlobs.clear();

Review comment:
       Can this be a instance method in DynamicState?
   This instance method will also need to be called from drainAllChangingBlobs() just before a new DynamicState is returned.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3374: STORM-3740 cancel downloading blobs when releasing worker slot

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3374:
URL: https://github.com/apache/storm/pull/3374#discussion_r570438668



##########
File path: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
##########
@@ -1288,6 +1296,18 @@ public DynamicState withPendingChangingBlobs(Set<Future<Void>> pendingChangingBl
                                     pendingChangingBlobs,
                                     pendingChangingBlobsAssignment, this.slotMetrics);
         }
+
+        private void cancelPendingBlobs() {
+            // Make sure any downloading blobs in the background are stopped.
+            // This prevents a race condition where we could be adding references to a
+            // delayed downloading blob after the slot gets released, causing orphaned blobs.
+            for (Future future : pendingChangingBlobs) {
+                if (!future.isDone()) {
+                    LOG.info("Canceling download of {}", future);
+                    future.cancel(true);
+                }
+            }
+        }

Review comment:
       Does this pendingChangingBlobs list need to be cleared before exiting method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org