You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "hdaikoku (via GitHub)" <gi...@apache.org> on 2023/08/10 08:04:45 UTC

[GitHub] [spark] hdaikoku opened a new pull request, #42426: [WIP][SPARK-44756] Executor hangs when RetryingBlockTransferor fails to initiate retry

hdaikoku opened a new pull request, #42426:
URL: https://github.com/apache/spark/pull/42426

   ### What changes were proposed in this pull request?
   This PR fixes a bug in `RetryingBlockTransferor` that could make executor hang when retry initiation has failed.
   
   ### Why are the changes needed?
   This is needed to catch any error from `RetryingBlockTransfeathror.initiateRetry()` and invoke the exception handler of the parent listener.
   
   
   ### Does this PR introduce _any_ user-facing change?
   N/A
   
   ### How was this patch tested?
   WIP


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hdaikoku commented on a diff in pull request #42426: [SPARK-44756] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "hdaikoku (via GitHub)" <gi...@apache.org>.
hdaikoku commented on code in PR #42426:
URL: https://github.com/apache/spark/pull/42426#discussion_r1289988068


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -149,6 +149,7 @@ public RetryingBlockTransferor(
    * in the event of transient IOExceptions.
    */
   public void start() {
+    currentListener = new RetryingBlockTransferListener();

Review Comment:
   This is needed to spy `RetryingBlockTransferor` as, in the constructor, `this` still bounds to the unspied instance, and hence the inner class instance will also inherit it and [this](https://github.com/apache/spark/blob/a223bc8c34ce4028c95985439d7b671162e56571/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java#L286) will invoke the real implementation.
   
   <img width="1120" alt="image" src="https://github.com/apache/spark/assets/17327104/232bc4f7-9608-43ae-acf6-5de658b5ed00">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hdaikoku commented on a diff in pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "hdaikoku (via GitHub)" <gi...@apache.org>.
hdaikoku commented on code in PR #42426:
URL: https://github.com/apache/spark/pull/42426#discussion_r1331742974


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -274,7 +287,13 @@ private void handleBlockTransferFailure(String blockId, Throwable exception) {
       synchronized (RetryingBlockTransferor.this) {
         if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
           if (shouldRetry(exception)) {
-            initiateRetry(exception);
+            try {
+              initiateRetry(exception);
+            } catch (Throwable t) {
+              logger.error("Exception while trying to initiate retry", t);

Review Comment:
   Thank you for the suggestion, refactored accordingly: [32498ff](https://github.com/apache/spark/pull/42426/commits/32498ff9a906a21746b619f1ddd316777d188417)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm closed pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm closed pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry
URL: https://github.com/apache/spark/pull/42426


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on PR #42426:
URL: https://github.com/apache/spark/pull/42426#issuecomment-1723690792

   @mridulm  This actually reminds me of this https://github.com/apache/spark/pull/37779 issue. In this case, if `execute()` was used instead of `submit()`, I think `SparkUncaughtExceptionHandler` should caught this OOM exception and abort the executor. For a long-term item, I really think we should replace all the unhandled `submit()` with `execute()` in Spark.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42426:
URL: https://github.com/apache/spark/pull/42426#discussion_r1320766012


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -177,20 +178,27 @@ private void transferAllOutstanding() {
         numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
 
       if (shouldRetry(e)) {
-        initiateRetry(e);
-      } else {
-        for (String bid : blockIdsToTransfer) {
-          listener.onBlockTransferFailure(bid, e);
+        try {
+          initiateRetry(e);
+          return;
+        } catch (Throwable t) {
+          logger.error("Exception while trying to initiate retry", t);

Review Comment:
   Note: I was considering if `e.addSuppressed(t);` will help here or not, but finally decided it might not be very useful



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -177,20 +178,27 @@ private void transferAllOutstanding() {
         numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
 
       if (shouldRetry(e)) {
-        initiateRetry(e);
-      } else {
-        for (String bid : blockIdsToTransfer) {
-          listener.onBlockTransferFailure(bid, e);
+        try {
+          initiateRetry(e);
+          return;
+        } catch (Throwable t) {
+          logger.error("Exception while trying to initiate retry", t);
         }
       }
+
+      // retry is not possible, so fail remaining blocks
+      for (String bid : blockIdsToTransfer) {
+        listener.onBlockTransferFailure(bid, e);
+      }
     }
   }
 
   /**
    * Lightweight method which initiates a retry in a different thread. The retry will involve
    * calling transferAllOutstanding() after a configured wait time.
    */
-  private synchronized void initiateRetry(Throwable e) {
+  @VisibleForTesting
+  public synchronized void initiateRetry(Throwable e) {

Review Comment:
   ```suggestion
     synchronized void initiateRetry(Throwable e) {
   ```



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -365,13 +365,32 @@ public void testIOExceptionFailsConnectionEvenWithSaslException()
             ImmutableMap.of("b0", block0)
     );
     configMap.put("spark.shuffle.sasl.enableRetries", "true");
-    performInteractions(interactions, listener);
+    performInteractions(interactions, listener, false);
     verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslExceptionFinal);
     verify(listener, atLeastOnce()).getTransferType();
     verifyNoMoreInteractions(listener);
     assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES);
   }
 
+  @Test
+  public void testRetryInitiationFailure() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        // IOException will initiate a retry, but the initiation will fail
+        ImmutableMap.<String, Object>builder()
+            .put("b0", new IOException("Connection failed or something"))
+            .put("b1", block1)
+            .build()
+    );
+
+    performInteractions(interactions, listener, true);

Review Comment:
   Using the proposed changes below, this will become:
   
   ```suggestion
       configureInteractions(interactions, listener);
       _retryingBlockTransferor = spy(_retryingBlockTransferor);
       // Throw an OOM when initiating retries.
       doThrow(OutOfMemoryError.class).when(_retryingBlockTransferor).initiateRetry(any());
       // Override listener, so that it delegates to the spied instance and not the original class.
       _retryingBlockTransferor.setCurrentListener(_retryingBlockTransferor.new RetryingBlockTransferListener());
       _retryingBlockTransferor.start();
   ```



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -80,7 +80,7 @@ public void testNoFailures() throws IOException, InterruptedException {
         .build()
       );
 
-    performInteractions(interactions, listener);
+    performInteractions(interactions, listener, false);

Review Comment:
   Let us revert all these `performInteractions` related diffs, more below.



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -381,9 +400,12 @@ public void testIOExceptionFailsConnectionEvenWithSaslException()
    * If multiple interactions are supplied, they will be used in order. This is useful for encoding
    * retries -- the first interaction may include an IOException, which causes a retry of some
    * subset of the original blocks in a second interaction.
+   *
+   * If mockInitiateRetryFailure is set to true, we mock initiateRetry() and throw an exception.
    */
   private static void performInteractions(List<? extends Map<String, Object>> interactions,
-                                          BlockFetchingListener listener)
+                                          BlockFetchingListener listener,
+                                          boolean mockInitiateRetryFailure)

Review Comment:
   Split `performInteractions` into `configureInteractions` and `performInteractions`.
   
   `configureInteractions` is existing `performInteractions` in master, but without the `_retryingBlockTransferor.start();`
   `performInteractions` will be:
   ```
     private static void performInteractions(List<? extends Map<String, Object>> interactions,
                                             BlockFetchingListener listener)
       throws IOException, InterruptedException {
       configureInteractions(interactions, listener);
       _retryingBlockTransferor.start();
     }
   ```



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -149,6 +149,7 @@ public RetryingBlockTransferor(
    * in the event of transient IOExceptions.
    */
   public void start() {
+    currentListener = new RetryingBlockTransferListener();

Review Comment:
   Instead, let us explicitly introduce a package-private setter for use in tests, and remove this change:
   ```
   @VisibleForTesting
   synchronized void setCurrentListener(RetryingBlockTransferListener listener) {
     this.currentListener = listener;
   }
   ```
   
   See more on the Suite on how to do this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hdaikoku commented on pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "hdaikoku (via GitHub)" <gi...@apache.org>.
hdaikoku commented on PR #42426:
URL: https://github.com/apache/spark/pull/42426#issuecomment-1678717683

   Hi @mridulm, would you mind reviewing this PR when you have time? Any feedback would be highly appreciated. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hdaikoku commented on pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "hdaikoku (via GitHub)" <gi...@apache.org>.
hdaikoku commented on PR #42426:
URL: https://github.com/apache/spark/pull/42426#issuecomment-1685547323

   > To make sure I understand correctly - there is an OOM which is thrown, which happens to be within `initiateRetry` and so shuffle fetch stalled indefinitely, and so task appeared to be hung ?
   
   Yes correct. To be more precise, it's not a memory issue as shown in the log - "unable to create new native thread". We suspect it's running out of either threads or file descriptors and thus is unable to spawn a new thread at `executorService.submit()` in `RetryingBlockTransferor#initiateRetry()`.
   
   
   > In meantime, you can simply run with `-XX:OnOutOfMemoryError` to kill the executor in case of OOM if this is blocking you ? This is what Spark on Yarn does (see `YarnSparkHadoopUtil.addOutOfMemoryErrorArgument`) - looks like this is not done in other resource managers.
   
   Actually we are running Spark on YARN, and having `-XX:OnOutOfMemoryError='kill %p'` in place. But it seems it's not working probably because it's "unable to create new native thread" and hence even unable to spawn `kill`.
   
   Instead, we have enabled `spark.speculation` to proactively kill any stuck tasks.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hdaikoku commented on a diff in pull request #42426: [WIP][SPARK-44756] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "hdaikoku (via GitHub)" <gi...@apache.org>.
hdaikoku commented on code in PR #42426:
URL: https://github.com/apache/spark/pull/42426#discussion_r1289988068


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -149,6 +149,7 @@ public RetryingBlockTransferor(
    * in the event of transient IOExceptions.
    */
   public void start() {
+    currentListener = new RetryingBlockTransferListener();

Review Comment:
   This is needed to spy `RetryingBlockTransferor` as, in the constructor, `this` still bounds to the unspied instance, and hence the inner class instance will also inherit it.
   
   <img width="1120" alt="image" src="https://github.com/apache/spark/assets/17327104/232bc4f7-9608-43ae-acf6-5de658b5ed00">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42426:
URL: https://github.com/apache/spark/pull/42426#discussion_r1333300336


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -201,10 +210,17 @@ private synchronized void initiateRetry(Throwable e) {
       listener.getTransferType(), retryCount, maxRetries, outstandingBlocksIds.size(),
       retryWaitTime);
 
-    executorService.submit(() -> {
-      Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
-      transferAllOutstanding();
-    });
+    try {
+      executorService.submit(() -> {

Review Comment:
   Let us replace it with `execute` instead.
   In future, we will add a checkstyle to prevent `submit` use in general, and replace with use of `execute` - but that is out of scope for this PR.
   
   Note - this PR is addressing the issue of task submission itself failing, and not failure after submitting - so change to `execute` wont fix the current problem: but let us clean this up incrementally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on code in PR #42426:
URL: https://github.com/apache/spark/pull/42426#discussion_r1328912310


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -274,7 +287,13 @@ private void handleBlockTransferFailure(String blockId, Throwable exception) {
       synchronized (RetryingBlockTransferor.this) {
         if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
           if (shouldRetry(exception)) {
-            initiateRetry(exception);
+            try {
+              initiateRetry(exception);
+            } catch (Throwable t) {
+              logger.error("Exception while trying to initiate retry", t);

Review Comment:
   Could you inline this common try-catch handler into `initiateRetry`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hdaikoku commented on pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "hdaikoku (via GitHub)" <gi...@apache.org>.
hdaikoku commented on PR #42426:
URL: https://github.com/apache/spark/pull/42426#issuecomment-1727835491

   > I think `SparkUncaughtExceptionHandler` should caught this OOM exception and abort the executor.
   
   I'm not sure if I'm following this. For this particular case, OOM was actually caught at [`AbstractChannelHandlerContext.invokeExceptionCaught`](https://github.com/netty/netty/blob/netty-4.1.74.Final/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java#L304). So it was not an uncaught exception.
   
   Also, if I understand correctly from the previous discussion in https://github.com/apache/spark/pull/37779, the problem of `submit()` is that it swallows uncaught exceptions that have been thrown from _inside_ a spawned thread, while, here, the OOM was thrown within `submit()` itself even before a thread was spawned.
   
   > ```
   > 	at java.lang.Thread.start0(Native Method)
   > 	at java.lang.Thread.start(Thread.java:719)
   > 	at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
   > 	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378)
   > 	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
   > ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "Ngone51 (via GitHub)" <gi...@apache.org>.
Ngone51 commented on PR #42426:
URL: https://github.com/apache/spark/pull/42426#issuecomment-1732956928

   > ...the OOM was thrown within submit() itself even before a thread was spawned.
   
   @hdaikoku You're right. The issue is different here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hdaikoku commented on pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "hdaikoku (via GitHub)" <gi...@apache.org>.
hdaikoku commented on PR #42426:
URL: https://github.com/apache/spark/pull/42426#issuecomment-1702324957

   No worries, we've already applied this internally and are good now. Just wanted to contribute it back to community.
   No rush 🙂 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #42426:
URL: https://github.com/apache/spark/pull/42426#issuecomment-1679913705

   +CC @otterc 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #42426:
URL: https://github.com/apache/spark/pull/42426#issuecomment-1712829719

   +CC @Ngone51 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hdaikoku commented on pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "hdaikoku (via GitHub)" <gi...@apache.org>.
hdaikoku commented on PR #42426:
URL: https://github.com/apache/spark/pull/42426#issuecomment-1714124374

   > @hdaikoku Do you know why the exception is not forwarded to `TransportChannelHandler#exceptionCaught()`?
   
   @Ngone51 As I understand, we have a pipeline of handlers here: https://github.com/apache/spark/blob/94de3ca2942bb04852510abccf06df1fa8b2dab3/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java#L196-L204
   
   ```
   encoder -- (channelRead) --> TransportFrameDecoder -- (channelRead) --> decoder -- (channelRead) --> idleStateHandler -- (channelRead) --> TransportChannelHandler
   ```
   
   And the original exception was raised in `TransportFrameDecoder` in the pipeline
   ```
   java.io.IOException: Cannot allocate memory
   	...
   	at org.apache.spark.network.util.TransportFrameDecoder.feedInterceptor(TransportFrameDecoder.java:263)
   	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:87)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   ```
   
   Hence the corresponding exception handler in `TransportFrameDecoder` was invoked [here](https://github.com/netty/netty/blob/netty-4.1.74.Final/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java#L302), which eventually raised another exception (`OutOfMemoryError`)
   ```
   23/08/09 16:58:53 shuffle-client-4-2 DEBUG AbstractChannelHandlerContext: An exception java.lang.OutOfMemoryError: unable to create new native thread
   	at java.lang.Thread.start0(Native Method)
   	at java.lang.Thread.start(Thread.java:719)
   	at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
   	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378)
   	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
   	at org.apache.spark.network.shuffle.RetryingBlockTransferor.initiateRetry(RetryingBlockTransferor.java:182)
   	at org.apache.spark.network.shuffle.RetryingBlockTransferor.access$500(RetryingBlockTransferor.java:43)
   	at org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.handleBlockTransferFailure(RetryingBlockTransferor.java:230)
   	at org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.onBlockFetchFailure(RetryingBlockTransferor.java:260)
   	at org.apache.spark.network.shuffle.OneForOneBlockFetcher.failRemainingBlocks(OneForOneBlockFetcher.java:318)
   	at org.apache.spark.network.shuffle.OneForOneBlockFetcher.access$300(OneForOneBlockFetcher.java:55)
   	at org.apache.spark.network.shuffle.OneForOneBlockFetcher$DownloadCallback.onFailure(OneForOneBlockFetcher.java:357)
   	at org.apache.spark.network.client.StreamInterceptor.exceptionCaught(StreamInterceptor.java:56)
   	at org.apache.spark.network.util.TransportFrameDecoder.exceptionCaught(TransportFrameDecoder.java:231)
   	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
   ```
   
   So I don't think any other handlers in the pipeline are involved in the exception handling process.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hdaikoku commented on a diff in pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "hdaikoku (via GitHub)" <gi...@apache.org>.
hdaikoku commented on code in PR #42426:
URL: https://github.com/apache/spark/pull/42426#discussion_r1333414112


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -201,10 +210,17 @@ private synchronized void initiateRetry(Throwable e) {
       listener.getTransferType(), retryCount, maxRetries, outstandingBlocksIds.size(),
       retryWaitTime);
 
-    executorService.submit(() -> {
-      Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
-      transferAllOutstanding();
-    });
+    try {
+      executorService.submit(() -> {

Review Comment:
   Sure: [79c48a7](https://github.com/apache/spark/pull/42426/commits/79c48a7639bb6d8cd5cf94bdf7fdf5a8a0d8606b)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42426:
URL: https://github.com/apache/spark/pull/42426#discussion_r1330362915


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -274,7 +287,13 @@ private void handleBlockTransferFailure(String blockId, Throwable exception) {
       synchronized (RetryingBlockTransferor.this) {
         if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
           if (shouldRetry(exception)) {
-            initiateRetry(exception);
+            try {
+              initiateRetry(exception);
+            } catch (Throwable t) {
+              logger.error("Exception while trying to initiate retry", t);

Review Comment:
   Do you mean push that to `initiateRetry` and return a boolean (or some such) to indicate failure ?
   Something like:
   
   ```
     if (initiateRetry(exception)) {
       return ;
     } else {
       outstandingBlocksIds.remove(blockId);
       shouldForwardFailure = true;
     }
   ```
   
   And do a `try/catch` on `Throwable` around `initiateRetry` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #42426:
URL: https://github.com/apache/spark/pull/42426#issuecomment-1735853527

   Merged to master.
   Thanks for fixing this @hdaikoku !
   Thanks for the review @Ngone51 :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hdaikoku commented on pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "hdaikoku (via GitHub)" <gi...@apache.org>.
hdaikoku commented on PR #42426:
URL: https://github.com/apache/spark/pull/42426#issuecomment-1714120090

   @mridulm Thank you so much for your feedback! Refactored the code accordingly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #42426:
URL: https://github.com/apache/spark/pull/42426#issuecomment-1702149439

   Apologies for the delay in getting to this @hdaikoku, it is a nontrivial change and unfortunately I got a bit swamped at work.
   Will circle around to it soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #42426:
URL: https://github.com/apache/spark/pull/42426#issuecomment-1685199364

   To make sure I understand correctly - there is an OOM which is thrown, which happens to be within `initiateRetry` and so shuffle fetch stalled ? I have not looked in detail at whether this is possible, but given this is an OOM and can be thrown at anywhere, would require careful analysis if we are trying to mitigate this.
   
   In meantime, you can simply run with `-XX:OnOutOfMemoryError` to kill the executor in case of OOM if this is blocking you ?
   This is what Spark on Yarn does (see `YarnSparkHadoopUtil.addOutOfMemoryErrorArgument`) - looks like this is not done in other resource managers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #42426:
URL: https://github.com/apache/spark/pull/42426#issuecomment-1694723882

   Thanks for clarifying @hdaikoku , let me take a look at this PR this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42426:
URL: https://github.com/apache/spark/pull/42426#discussion_r1330362915


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -274,7 +287,13 @@ private void handleBlockTransferFailure(String blockId, Throwable exception) {
       synchronized (RetryingBlockTransferor.this) {
         if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
           if (shouldRetry(exception)) {
-            initiateRetry(exception);
+            try {
+              initiateRetry(exception);
+            } catch (Throwable t) {
+              logger.error("Exception while trying to initiate retry", t);

Review Comment:
   Do you mean push that to `initiateRetry` and return a boolean (or some such) to indicate failure ?
   Something like:
   
   ```
     if (initiateRetry(exception)) {
       return ;
     } else {
       outstandingBlocksIds.remove(blockId);
       shouldForwardFailure = true;
     }
   ```
   
   And do a `try/catch` on `Throwable` around `initiateRetry` ?
   
   (I misread your response @Ngone51 , hence my code snippet above for clarity :) )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #42426:
URL: https://github.com/apache/spark/pull/42426#issuecomment-1724557591

   Very good callout @Ngone51 , we should probably add a check style error as well to prevent its usage


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hdaikoku commented on pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry

Posted by "hdaikoku (via GitHub)" <gi...@apache.org>.
hdaikoku commented on PR #42426:
URL: https://github.com/apache/spark/pull/42426#issuecomment-1735906844

   Thank you so much for reviewing and merging this @mridulm @Ngone51 🙏🏻


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org