You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "mridulm (via GitHub)" <gi...@apache.org> on 2023/09/10 14:26:56 UTC

[GitHub] [spark] mridulm commented on a diff in pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

mridulm commented on code in PR #42426:
URL: https://github.com/apache/spark/pull/42426#discussion_r1320766012


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -177,20 +178,27 @@ private void transferAllOutstanding() {
         numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
 
       if (shouldRetry(e)) {
-        initiateRetry(e);
-      } else {
-        for (String bid : blockIdsToTransfer) {
-          listener.onBlockTransferFailure(bid, e);
+        try {
+          initiateRetry(e);
+          return;
+        } catch (Throwable t) {
+          logger.error("Exception while trying to initiate retry", t);

Review Comment:
   Note: I was considering if `e.addSuppressed(t);` will help here or not, but finally decided it might not be very useful



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -177,20 +178,27 @@ private void transferAllOutstanding() {
         numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
 
       if (shouldRetry(e)) {
-        initiateRetry(e);
-      } else {
-        for (String bid : blockIdsToTransfer) {
-          listener.onBlockTransferFailure(bid, e);
+        try {
+          initiateRetry(e);
+          return;
+        } catch (Throwable t) {
+          logger.error("Exception while trying to initiate retry", t);
         }
       }
+
+      // retry is not possible, so fail remaining blocks
+      for (String bid : blockIdsToTransfer) {
+        listener.onBlockTransferFailure(bid, e);
+      }
     }
   }
 
   /**
    * Lightweight method which initiates a retry in a different thread. The retry will involve
    * calling transferAllOutstanding() after a configured wait time.
    */
-  private synchronized void initiateRetry(Throwable e) {
+  @VisibleForTesting
+  public synchronized void initiateRetry(Throwable e) {

Review Comment:
   ```suggestion
     synchronized void initiateRetry(Throwable e) {
   ```



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -365,13 +365,32 @@ public void testIOExceptionFailsConnectionEvenWithSaslException()
             ImmutableMap.of("b0", block0)
     );
     configMap.put("spark.shuffle.sasl.enableRetries", "true");
-    performInteractions(interactions, listener);
+    performInteractions(interactions, listener, false);
     verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslExceptionFinal);
     verify(listener, atLeastOnce()).getTransferType();
     verifyNoMoreInteractions(listener);
     assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES);
   }
 
+  @Test
+  public void testRetryInitiationFailure() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        // IOException will initiate a retry, but the initiation will fail
+        ImmutableMap.<String, Object>builder()
+            .put("b0", new IOException("Connection failed or something"))
+            .put("b1", block1)
+            .build()
+    );
+
+    performInteractions(interactions, listener, true);

Review Comment:
   Using the proposed changes below, this will become:
   
   ```suggestion
       configureInteractions(interactions, listener);
       _retryingBlockTransferor = spy(_retryingBlockTransferor);
       // Throw an OOM when initiating retries.
       doThrow(OutOfMemoryError.class).when(_retryingBlockTransferor).initiateRetry(any());
       // Override listener, so that it delegates to the spied instance and not the original class.
       _retryingBlockTransferor.setCurrentListener(_retryingBlockTransferor.new RetryingBlockTransferListener());
       _retryingBlockTransferor.start();
   ```



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -80,7 +80,7 @@ public void testNoFailures() throws IOException, InterruptedException {
         .build()
       );
 
-    performInteractions(interactions, listener);
+    performInteractions(interactions, listener, false);

Review Comment:
   Let us revert all these `performInteractions` related diffs, more below.



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -381,9 +400,12 @@ public void testIOExceptionFailsConnectionEvenWithSaslException()
    * If multiple interactions are supplied, they will be used in order. This is useful for encoding
    * retries -- the first interaction may include an IOException, which causes a retry of some
    * subset of the original blocks in a second interaction.
+   *
+   * If mockInitiateRetryFailure is set to true, we mock initiateRetry() and throw an exception.
    */
   private static void performInteractions(List<? extends Map<String, Object>> interactions,
-                                          BlockFetchingListener listener)
+                                          BlockFetchingListener listener,
+                                          boolean mockInitiateRetryFailure)

Review Comment:
   Split `performInteractions` into `configureInteractions` and `performInteractions`.
   
   `configureInteractions` is existing `performInteractions` in master, but without the `_retryingBlockTransferor.start();`
   `performInteractions` will be:
   ```
     private static void performInteractions(List<? extends Map<String, Object>> interactions,
                                             BlockFetchingListener listener)
       throws IOException, InterruptedException {
       configureInteractions(interactions, listener);
       _retryingBlockTransferor.start();
     }
   ```



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -149,6 +149,7 @@ public RetryingBlockTransferor(
    * in the event of transient IOExceptions.
    */
   public void start() {
+    currentListener = new RetryingBlockTransferListener();

Review Comment:
   Instead, let us explicitly introduce a package-private setter for use in tests, and remove this change:
   ```
   @VisibleForTesting
   synchronized void setCurrentListener(RetryingBlockTransferListener listener) {
     this.currentListener = listener;
   }
   ```
   
   See more on the Suite on how to do this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org