You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/07 05:57:40 UTC

[GitHub] [spark] akpatnam25 opened a new pull request, #38959: [WIP] SPARK-41415: SASL Request Retries

akpatnam25 opened a new pull request, #38959:
URL: https://github.com/apache/spark/pull/38959

   
   
   ### What changes were proposed in this pull request?
   <!--
   Add the ability to retry SASL requests. Will add it as a metric too soon to track SASL retries. 
   -->
   
   
   ### Why are the changes needed?
   <!--
   We are seeing increased SASL timeouts internally, and this issue would mitigate the issue. We already have this feature enabled for our 2.3 jobs. 
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   No
   -->
   
   
   ### How was this patch tested?
   <!--
   Added some unit tests
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1063957738


##########
core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala:
##########
@@ -251,6 +251,10 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
         }
         handleResult(PushResult(blockId, exception))
       }
+
+      override def onSaslTimeout(): Unit = {
+        TaskContext.get().taskMetrics().incSaslRequestRetries(1)

Review Comment:
   Updating metrics once the task has terminated does not help - since they are not propagated to driver.
   One option would be to rely on `ExecutorMetricsSource` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on PR #38959:
URL: https://github.com/apache/spark/pull/38959#issuecomment-1372609641

   re-running this to see if the linters errors are just transient. The linters errors seem unrelated to this PR as there are no python related changes in this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38959:
URL: https://github.com/apache/spark/pull/38959#issuecomment-1382693225

   Can you fix the test failure @akpatnam25 ? Thx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1070072578


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -230,6 +243,50 @@ public void testRetryAndUnrecoverable() throws IOException, InterruptedException
     verifyNoMoreInteractions(listener);
   }
 
+  @Test
+  public void testSaslTimeoutFailure() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    TimeoutException timeoutException = new TimeoutException();
+    SaslTimeoutException saslTimeoutException =
+        new SaslTimeoutException(timeoutException);
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        ImmutableMap.<String, Object>builder()
+            .put("b0", saslTimeoutException)
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRetryOnSaslTimeout() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        // SaslTimeout will cause a retry. Since b0 fails, we will retry both.
+        ImmutableMap.<String, Object>builder()
+            .put("b0", new SaslTimeoutException(new TimeoutException()))
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rmcyang commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
rmcyang commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1070099546


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -192,8 +200,14 @@ private synchronized void initiateRetry() {
   private synchronized boolean shouldRetry(Throwable e) {

Review Comment:
   Should the doc above be updated? Since we will also retry for SaslTimeoutException.



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -20,13 +20,18 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
+import java.util.concurrent.TimeoutException;

Review Comment:
   nit: move this import statement to above group



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm closed pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
mridulm closed pull request #38959: SPARK-41415: SASL Request Retries
URL: https://github.com/apache/spark/pull/38959


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1063830324


##########
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java:
##########
@@ -287,6 +288,9 @@ public void onFailure(Throwable e) {
 
     try {
       return result.get(timeoutMs, TimeUnit.MILLISECONDS);
+    } catch (TimeoutException e) {
+      logger.warn("RPC {} timed-out", rpcId);
+      throw Throwables.propagate(new SaslTimeoutException(e));

Review Comment:
   `RpcTimeoutException` in the core module and this is in common. Will just do:
   ```
   throw Throwables.propagate(e.getCause());
   ```
   like the rest of the catches to avoid making it too complicated for now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1070469083


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -17,6 +17,7 @@
 
 package org.apache.spark.network.shuffle;
 
+import com.google.common.annotations.VisibleForTesting;

Review Comment:
   done



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -26,6 +27,7 @@
 
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.spark.network.sasl.SaslTimeoutException;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1063830324


##########
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java:
##########
@@ -287,6 +288,9 @@ public void onFailure(Throwable e) {
 
     try {
       return result.get(timeoutMs, TimeUnit.MILLISECONDS);
+    } catch (TimeoutException e) {
+      logger.warn("RPC {} timed-out", rpcId);
+      throw Throwables.propagate(new SaslTimeoutException(e));

Review Comment:
   `RpcTimeoutException` in the core module and this is in common. Will just do:
   ```
   throw Throwables.propagate(e.getCause());
   ```
   to avoid making it too complicated for now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1067516221


##########
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java:
##########
@@ -338,6 +342,15 @@ public String toString() {
       .toString();
   }
 
+  /**
+   * Exception thrown when sasl request times out.
+   */
+  public static class SaslTimeoutException extends RuntimeException {
+    public SaslTimeoutException(Throwable cause) {
+      super((cause));
+    }
+  }

Review Comment:
   Remove this ?



##########
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java:
##########
@@ -294,6 +294,15 @@ public void onFailure(Throwable e) {
     }
   }
 
+  /**
+   * Exception thrown when sasl request times out.
+   */
+  public static class SaslTimeoutException extends RuntimeException {
+    public SaslTimeoutException(Throwable cause) {
+      super((cause));
+    }
+  }
+

Review Comment:
   Move this to ` org.apache.spark.network.sasl` package.
   Also add the common constructors for an Exception - `SaslTimeoutException()`, `SaslTimeoutException(String)`, `SaslTimeoutException(String, Throwable)`, in addition to what we have here.



##########
common/network-common/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java:
##########
@@ -65,9 +67,18 @@ public void doBootstrap(TransportClient client, Channel channel) {
         SaslMessage msg = new SaslMessage(appId, payload);
         ByteBuf buf = Unpooled.buffer(msg.encodedLength() + (int) msg.body().size());
         msg.encode(buf);
+        ByteBuffer response;
         buf.writeBytes(msg.body().nioByteBuffer());
-
-        ByteBuffer response = client.sendRpcSync(buf.nioBuffer(), conf.authRTTimeoutMs());
+        try {
+          response = client.sendRpcSync(buf.nioBuffer(), conf.authRTTimeoutMs());
+        } catch (RuntimeException ex) {
+          // We know it is a Sasl timeout here if it is a TimeoutException.
+          if (ex.getCause() instanceof TimeoutException) {
+            throw Throwables.propagate(new TransportClient.SaslTimeoutException(ex.getCause()));

Review Comment:
   Why `Throwables.propagate` ? Simply create and throw the new `SaslTimeoutException` directly ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -192,8 +197,12 @@ private synchronized void initiateRetry() {
   private synchronized boolean shouldRetry(Throwable e) {
     boolean isIOException = e instanceof IOException
       || e.getCause() instanceof IOException;
+    boolean isSaslTimeout = enableSaslRetries &&
+        (e instanceof TransportClient.SaslTimeoutException ||
+            (e.getCause() != null && e.getCause() instanceof TransportClient.SaslTimeoutException));
     boolean hasRemainingRetries = retryCount < maxRetries;
-    return isIOException && hasRemainingRetries && errorHandler.shouldRetryError(e);
+    return (isSaslTimeout || isIOException) &&
+        hasRemainingRetries && errorHandler.shouldRetryError(e);

Review Comment:
   If we had sasl failures which triggered a retry - and we finally succeeded - we should reset `retryCount` to `0`.
   sasl is part of bootstrap, unlike other failures we retry for.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -192,8 +197,12 @@ private synchronized void initiateRetry() {
   private synchronized boolean shouldRetry(Throwable e) {
     boolean isIOException = e instanceof IOException
       || e.getCause() instanceof IOException;
+    boolean isSaslTimeout = enableSaslRetries &&
+        (e instanceof TransportClient.SaslTimeoutException ||
+            (e.getCause() != null && e.getCause() instanceof TransportClient.SaslTimeoutException));

Review Comment:
   With the proposed changes, this will simply become:
   ```suggestion
           (e instanceof TransportClient.SaslTimeoutException);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38959:
URL: https://github.com/apache/spark/pull/38959#issuecomment-1383065883

   If we want to backport to other branches, we might want to create new PR's.
   @dongjoon-hyun, thoughts on getting this added to 3.3 and 3.2 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38959:
URL: https://github.com/apache/spark/pull/38959#issuecomment-1383086247

   Thanks @dongjoon-hyun !
   Can you create a PR for the 3.3 and 3.2 branch as well by backporting this @akpatnam25 ? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1069084937


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -192,8 +197,12 @@ private synchronized void initiateRetry() {
   private synchronized boolean shouldRetry(Throwable e) {
     boolean isIOException = e instanceof IOException
       || e.getCause() instanceof IOException;
+    boolean isSaslTimeout = enableSaslRetries &&
+        (e instanceof TransportClient.SaslTimeoutException ||
+            (e.getCause() != null && e.getCause() instanceof TransportClient.SaslTimeoutException));
     boolean hasRemainingRetries = retryCount < maxRetries;
-    return isIOException && hasRemainingRetries && errorHandler.shouldRetryError(e);
+    return (isSaslTimeout || isIOException) &&
+        hasRemainingRetries && errorHandler.shouldRetryError(e);

Review Comment:
   I had to use a map to track the context in which we want to set the retryCount back to 0, since we only want to do it in the case that the exception for that blockId is a SaslTimeoutException. The map is only accessed and mutated within synchronized blocks, so this should allow us to ensure that we are only setting retryCount in the case of SaslTimeoutException retries, and not IOException retries. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1063031369


##########
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java:
##########
@@ -287,6 +288,9 @@ public void onFailure(Throwable e) {
 
     try {
       return result.get(timeoutMs, TimeUnit.MILLISECONDS);
+    } catch (TimeoutException e) {
+      logger.warn("RPC {} timed-out", rpcId);
+      throw Throwables.propagate(new SaslTimeoutException(e));

Review Comment:
   `SaslTimeoutException` -> `RpcTimeoutException` ? (there is one in `org.apache.spark.rpc`)
   
   This api is not specific to SASL - we cannot assume a timeout for a sync send is due to sasl timeout.



##########
core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala:
##########
@@ -251,6 +251,10 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
         }
         handleResult(PushResult(blockId, exception))
       }
+
+      override def onSaslTimeout(): Unit = {
+        TaskContext.get().taskMetrics().incSaslRequestRetries(1)

Review Comment:
   This is not within the task thread - you should be getting an NPE as task context should be null.
   
   If the tests are not failing, we have to make sure we are testing this fix appropriately - some codepaths (like push related suite's) might not be getting tested with this ?



##########
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java:
##########
@@ -287,6 +288,9 @@ public void onFailure(Throwable e) {
 
     try {
       return result.get(timeoutMs, TimeUnit.MILLISECONDS);
+    } catch (TimeoutException e) {
+      logger.warn("RPC {} timed-out", rpcId);

Review Comment:
   This should be at `trace` level - `debug` at best



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -245,8 +269,9 @@ private static void performInteractions(List<? extends Map<String, Object>> inte
     throws IOException, InterruptedException {
 
     MapConfigProvider provider = new MapConfigProvider(ImmutableMap.of(
-      "spark.shuffle.io.maxRetries", "2",
-      "spark.shuffle.io.retryWait", "0"));
+        "spark.shuffle.io.maxRetries", "2",
+        "spark.shuffle.io.retryWait", "0",
+        "spark.shuffle.sasl.enableRetries", "true"));

Review Comment:
   nit: fix indentation.
   
   Also, we want to make sure tests are handling `spark.shuffle.sasl.enableRetries = false` case correctly as well.
   You can extend the test and specify value of enableRetries = true there
   
   See `LevelDBHybridStoreSuite` as an example



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockTransferListener.java:
##########
@@ -41,4 +41,6 @@ public interface BlockTransferListener extends EventListener {
    * Return a string indicating the type of the listener such as fetch, push, or something else
    */
   String getTransferType();
+
+  void onSaslTimeout();

Review Comment:
   Add default impl for it



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -289,5 +301,11 @@ public String getTransferType() {
       throw new RuntimeException(
         "Invocation on RetryingBlockTransferListener.getTransferType is unexpected.");
     }
+
+    @Override
+    public void onSaslTimeout() {
+      throw new RuntimeException(

Review Comment:
   `RuntimeException` -> `IllegalStateException` or something specific



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on PR #38959:
URL: https://github.com/apache/spark/pull/38959#issuecomment-1347832052

   @otterc  @mridulm  removing the WIP tag from this PR. this should be good to review now. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38959:
URL: https://github.com/apache/spark/pull/38959#issuecomment-1383006361

   Missed out on the imports - once builds succeeds, I will merge it.
   Based on current test progress, looks like all the tests are working fine (just taking time in sql slow tests).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1070170721


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -230,6 +245,71 @@ public void testRetryAndUnrecoverable() throws IOException, InterruptedException
     verifyNoMoreInteractions(listener);
   }
 
+  @Test
+  public void testSaslTimeoutFailure() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    TimeoutException timeoutException = new TimeoutException();
+    SaslTimeoutException saslTimeoutException =
+        new SaslTimeoutException(timeoutException);
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        ImmutableMap.<String, Object>builder()
+            .put("b0", saslTimeoutException)
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRetryOnSaslTimeout() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        // SaslTimeout will cause a retry. Since b0 fails, we will retry both.
+        ImmutableMap.<String, Object>builder()
+            .put("b0", new SaslTimeoutException(new TimeoutException()))
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRepeatedSaslRetryFailures() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    TimeoutException timeoutException = new TimeoutException();
+    SaslTimeoutException saslTimeoutException =
+        new SaslTimeoutException(timeoutException);
+    List<ImmutableMap<String, Object>> interactions = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      interactions.add(
+          ImmutableMap.<String, Object>builder()
+              .put("b0", saslTimeoutException)
+              .build()
+      );
+    }
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+    verify(listener, times(3)).getTransferType();
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException);
+    verifyNoMoreInteractions(listener);
+  }

Review Comment:
   retryCount is not exposed from `RetryingBlockTransferor`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38959:
URL: https://github.com/apache/spark/pull/38959#issuecomment-1374386300

   > I think metric change is not complete. I don't see the change in the rest api and many other places.
   
   Agree, this should be WIP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38959: [WIP] SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38959:
URL: https://github.com/apache/spark/pull/38959#issuecomment-1343150583

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1068998144


##########
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java:
##########
@@ -338,6 +342,15 @@ public String toString() {
       .toString();
   }
 
+  /**
+   * Exception thrown when sasl request times out.
+   */
+  public static class SaslTimeoutException extends RuntimeException {
+    public SaslTimeoutException(Throwable cause) {
+      super((cause));
+    }
+  }

Review Comment:
   outdated/old comment :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1069990003


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -230,6 +243,50 @@ public void testRetryAndUnrecoverable() throws IOException, InterruptedException
     verifyNoMoreInteractions(listener);
   }
 
+  @Test
+  public void testSaslTimeoutFailure() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    TimeoutException timeoutException = new TimeoutException();
+    SaslTimeoutException saslTimeoutException =
+        new SaslTimeoutException(timeoutException);
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        ImmutableMap.<String, Object>builder()
+            .put("b0", saslTimeoutException)
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRetryOnSaslTimeout() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        // SaslTimeout will cause a retry. Since b0 fails, we will retry both.
+        ImmutableMap.<String, Object>builder()
+            .put("b0", new SaslTimeoutException(new TimeoutException()))
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }

Review Comment:
   Add a test where repeated sasl timeout failures results in failing the request - to make sure there are no regressions from current codebase (when flag is enabled).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38959:
URL: https://github.com/apache/spark/pull/38959#issuecomment-1382436350

   I will merge it once @rmcyang's comments are addressed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1070467917


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -17,6 +17,7 @@
 
 package org.apache.spark.network.shuffle;
 
+import com.google.common.annotations.VisibleForTesting;

Review Comment:
   Not sure why linter did not catch this - move it to the third party section of imports. (along with existing com.google)



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -26,6 +27,7 @@
 
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.spark.network.sasl.SaslTimeoutException;

Review Comment:
   `org.apache` imports come last.
   See [here](https://github.com/databricks/scala-style-guide#imports) for details.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1069860734


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -84,6 +87,12 @@ void createAndStart(String[] blockIds, BlockTransferListener listener)
   // while inside a synchronized block.
   /** Number of times we've attempted to retry so far. */
   private int retryCount = 0;
+  /**
+   * Map to track blockId to exception that the block is being retried for.
+   * This is mainly used in the case of SASL retries, because we need to set
+   * `retryCount` back to 0 in those cases.
+   */
+  private Map<String, Throwable> blockIdToException;

Review Comment:
   yes, added this. this would be simpler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
otterc commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1063849144


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java:
##########
@@ -46,4 +46,7 @@ default void onBlockTransferFailure(String blockId, Throwable exception) {
   default String getTransferType() {
     return "fetch";
   }
+
+  @Override
+  default void onSaslTimeout() {}

Review Comment:
   If we have this in BlockTransferListener, then we don't to re-define it here 



##########
core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala:
##########
@@ -251,6 +251,10 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
         }
         handleResult(PushResult(blockId, exception))
       }
+
+      override def onSaslTimeout(): Unit = {
+        TaskContext.get().taskMetrics().incSaslRequestRetries(1)

Review Comment:
   In our internal fork, ShuffleBlockPusher is created with the `writeMetrics`. We can change the constructor to use `taskMetrics`. However, then the concern is that push is happening outside the task thread so whether the metric is correct or not. One way to move forward is to make this metric count SASL retries on when shuffle data is fetched and not pushed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on PR #38959:
URL: https://github.com/apache/spark/pull/38959#issuecomment-1382447740

   addressed @rmcyang's comments, will monitor the build. I have been getting some linter errors from the python side which are unrelated to this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38959:
URL: https://github.com/apache/spark/pull/38959#issuecomment-1382906124

   Also, please update to latest master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38959:
URL: https://github.com/apache/spark/pull/38959#issuecomment-1347874874

   It is not clear to me why we need the protocol change, and why not simply recreate a new socket connection ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on pull request #38959: [WIP] SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on PR #38959:
URL: https://github.com/apache/spark/pull/38959#issuecomment-1340410484

   cc @mridulm  @otterc @zhouyejoe 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38959:
URL: https://github.com/apache/spark/pull/38959#issuecomment-1383065956

   Merged to master.
   Thanks for working on this @akpatnam25 !
   Thanks for the reviews @otterc, @rmcyang :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on PR #38959:
URL: https://github.com/apache/spark/pull/38959#issuecomment-1383009033

   sounds good, thanks for following up even on the weekend @mridulm!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
otterc commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1070168146


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -230,6 +245,71 @@ public void testRetryAndUnrecoverable() throws IOException, InterruptedException
     verifyNoMoreInteractions(listener);
   }
 
+  @Test
+  public void testSaslTimeoutFailure() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    TimeoutException timeoutException = new TimeoutException();
+    SaslTimeoutException saslTimeoutException =
+        new SaslTimeoutException(timeoutException);
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        ImmutableMap.<String, Object>builder()
+            .put("b0", saslTimeoutException)
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRetryOnSaslTimeout() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        // SaslTimeout will cause a retry. Since b0 fails, we will retry both.
+        ImmutableMap.<String, Object>builder()
+            .put("b0", new SaslTimeoutException(new TimeoutException()))
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRepeatedSaslRetryFailures() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    TimeoutException timeoutException = new TimeoutException();
+    SaslTimeoutException saslTimeoutException =
+        new SaslTimeoutException(timeoutException);
+    List<ImmutableMap<String, Object>> interactions = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      interactions.add(
+          ImmutableMap.<String, Object>builder()
+              .put("b0", saslTimeoutException)
+              .build()
+      );
+    }
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+    verify(listener, times(3)).getTransferType();
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException);
+    verifyNoMoreInteractions(listener);
+  }

Review Comment:
   We should verify the retry count is reset in the tests once  the SASL request is successful.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -187,13 +195,20 @@ private synchronized void initiateRetry() {
 
   /**
    * Returns true if we should retry due a block transfer failure. We will retry if and only if
-   * the exception was an IOException and we haven't retried 'maxRetries' times already.
+   * the exception was an IOException or SaslTimeoutException and we haven't retried
+   * 'maxRetries' times already.
    */
   private synchronized boolean shouldRetry(Throwable e) {
     boolean isIOException = e instanceof IOException
       || e.getCause() instanceof IOException;
+    boolean isSaslTimeout = enableSaslRetries && e instanceof SaslTimeoutException;
     boolean hasRemainingRetries = retryCount < maxRetries;
-    return isIOException && hasRemainingRetries && errorHandler.shouldRetryError(e);
+    boolean shouldRetry =  (isSaslTimeout || isIOException) &&
+        hasRemainingRetries && errorHandler.shouldRetryError(e);
+    if (shouldRetry && isSaslTimeout) {
+      this.isCurrentSaslTimeout = true;
+    }

Review Comment:
   Should we not reset `isCurrentSaslTimeout=false` and `retryCount=0` when there is an exception but it's not a `SaslTimeout`



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -85,6 +86,8 @@ void createAndStart(String[] blockIds, BlockTransferListener listener)
   /** Number of times we've attempted to retry so far. */
   private int retryCount = 0;
 
+  private boolean isCurrentSaslTimeout;

Review Comment:
   Nit: rename to "saslTimeoutSeen" ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1067824025


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -245,8 +269,9 @@ private static void performInteractions(List<? extends Map<String, Object>> inte
     throws IOException, InterruptedException {
 
     MapConfigProvider provider = new MapConfigProvider(ImmutableMap.of(
-      "spark.shuffle.io.maxRetries", "2",
-      "spark.shuffle.io.retryWait", "0"));
+        "spark.shuffle.io.maxRetries", "2",
+        "spark.shuffle.io.retryWait", "0",
+        "spark.shuffle.sasl.enableRetries", "true"));

Review Comment:
   I don't think we can follow the same pattern here as `LevelDBHybridStoreSuite` because the test logic between the 2 cases is also different. It would make sense to extend if the test logic didn't differ. I ended up making the config map a private static field that can be modified at a per test level. This should be extensible to future tests that are added that need to add new configs as well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1070170721


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -230,6 +245,71 @@ public void testRetryAndUnrecoverable() throws IOException, InterruptedException
     verifyNoMoreInteractions(listener);
   }
 
+  @Test
+  public void testSaslTimeoutFailure() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    TimeoutException timeoutException = new TimeoutException();
+    SaslTimeoutException saslTimeoutException =
+        new SaslTimeoutException(timeoutException);
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        ImmutableMap.<String, Object>builder()
+            .put("b0", saslTimeoutException)
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRetryOnSaslTimeout() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        // SaslTimeout will cause a retry. Since b0 fails, we will retry both.
+        ImmutableMap.<String, Object>builder()
+            .put("b0", new SaslTimeoutException(new TimeoutException()))
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRepeatedSaslRetryFailures() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    TimeoutException timeoutException = new TimeoutException();
+    SaslTimeoutException saslTimeoutException =
+        new SaslTimeoutException(timeoutException);
+    List<ImmutableMap<String, Object>> interactions = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      interactions.add(
+          ImmutableMap.<String, Object>builder()
+              .put("b0", saslTimeoutException)
+              .build()
+      );
+    }
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+    verify(listener, times(3)).getTransferType();
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException);
+    verifyNoMoreInteractions(listener);
+  }

Review Comment:
   retryCount is not exposed 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1070184068


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -230,6 +245,71 @@ public void testRetryAndUnrecoverable() throws IOException, InterruptedException
     verifyNoMoreInteractions(listener);
   }
 
+  @Test
+  public void testSaslTimeoutFailure() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    TimeoutException timeoutException = new TimeoutException();
+    SaslTimeoutException saslTimeoutException =
+        new SaslTimeoutException(timeoutException);
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        ImmutableMap.<String, Object>builder()
+            .put("b0", saslTimeoutException)
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRetryOnSaslTimeout() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        // SaslTimeout will cause a retry. Since b0 fails, we will retry both.
+        ImmutableMap.<String, Object>builder()
+            .put("b0", new SaslTimeoutException(new TimeoutException()))
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRepeatedSaslRetryFailures() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    TimeoutException timeoutException = new TimeoutException();
+    SaslTimeoutException saslTimeoutException =
+        new SaslTimeoutException(timeoutException);
+    List<ImmutableMap<String, Object>> interactions = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      interactions.add(
+          ImmutableMap.<String, Object>builder()
+              .put("b0", saslTimeoutException)
+              .build()
+      );
+    }
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+    verify(listener, times(3)).getTransferType();
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException);
+    verifyNoMoreInteractions(listener);
+  }

Review Comment:
   will use @VisibleForTesting annotation 



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -230,6 +245,71 @@ public void testRetryAndUnrecoverable() throws IOException, InterruptedException
     verifyNoMoreInteractions(listener);
   }
 
+  @Test
+  public void testSaslTimeoutFailure() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    TimeoutException timeoutException = new TimeoutException();
+    SaslTimeoutException saslTimeoutException =
+        new SaslTimeoutException(timeoutException);
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        ImmutableMap.<String, Object>builder()
+            .put("b0", saslTimeoutException)
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRetryOnSaslTimeout() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        // SaslTimeout will cause a retry. Since b0 fails, we will retry both.
+        ImmutableMap.<String, Object>builder()
+            .put("b0", new SaslTimeoutException(new TimeoutException()))
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRepeatedSaslRetryFailures() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    TimeoutException timeoutException = new TimeoutException();
+    SaslTimeoutException saslTimeoutException =
+        new SaslTimeoutException(timeoutException);
+    List<ImmutableMap<String, Object>> interactions = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      interactions.add(
+          ImmutableMap.<String, Object>builder()
+              .put("b0", saslTimeoutException)
+              .build()
+      );
+    }
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+    verify(listener, times(3)).getTransferType();
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException);
+    verifyNoMoreInteractions(listener);
+  }

Review Comment:
   will use `@VisibleForTesting` annotation 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
otterc commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1063848374


##########
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java:
##########
@@ -333,6 +333,13 @@ public boolean useOldFetchProtocol() {
     return conf.getBoolean("spark.shuffle.useOldFetchProtocol", false);
   }
 
+  /** Whether to enable sasl retries. Sasl retries will be enabled, once the shuffle
+   * server is upgraded.

Review Comment:
   Rephrase the comment. We will not have any server side changes any more



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1067498072


##########
core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala:
##########
@@ -251,6 +251,10 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
         }
         handleResult(PushResult(blockId, exception))
       }
+
+      override def onSaslTimeout(): Unit = {
+        TaskContext.get().taskMetrics().incSaslRequestRetries(1)

Review Comment:
   discussed offline, we will not making the metric change as no other network metric is exposed anyways. If we need it down the line, we can add it as a follow-up task. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
otterc commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1069777688


##########
common/network-common/src/main/java/org/apache/spark/network/sasl/SaslTimeoutException.java:
##########
@@ -0,0 +1,15 @@
+package org.apache.spark.network.sasl;
+
+public class SaslTimeoutException extends RuntimeException {

Review Comment:
   Nit: Add javadoc



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java:
##########
@@ -84,6 +87,12 @@ void createAndStart(String[] blockIds, BlockTransferListener listener)
   // while inside a synchronized block.
   /** Number of times we've attempted to retry so far. */
   private int retryCount = 0;
+  /**
+   * Map to track blockId to exception that the block is being retried for.
+   * This is mainly used in the case of SASL retries, because we need to set
+   * `retryCount` back to 0 in those cases.
+   */
+  private Map<String, Throwable> blockIdToException;

Review Comment:
   I don't think this mapping is needed. You can reset  the `retryCount` to 0 after there aren't any more SASl timeouts to address @mridulm's comment. You may just need another flag that marks whether the last timeout was a SaslTimeout or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1070072578


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -230,6 +243,50 @@ public void testRetryAndUnrecoverable() throws IOException, InterruptedException
     verifyNoMoreInteractions(listener);
   }
 
+  @Test
+  public void testSaslTimeoutFailure() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    TimeoutException timeoutException = new TimeoutException();
+    SaslTimeoutException saslTimeoutException =
+        new SaslTimeoutException(timeoutException);
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        ImmutableMap.<String, Object>builder()
+            .put("b0", saslTimeoutException)
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRetryOnSaslTimeout() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        // SaslTimeout will cause a retry. Since b0 fails, we will retry both.
+        ImmutableMap.<String, Object>builder()
+            .put("b0", new SaslTimeoutException(new TimeoutException()))
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }

Review Comment:
   done - `testRepeatedSaslRetryFailures`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on PR #38959:
URL: https://github.com/apache/spark/pull/38959#issuecomment-1380920219

   @mridulm should be good to review now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38959:
URL: https://github.com/apache/spark/pull/38959#discussion_r1069990003


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java:
##########
@@ -230,6 +243,50 @@ public void testRetryAndUnrecoverable() throws IOException, InterruptedException
     verifyNoMoreInteractions(listener);
   }
 
+  @Test
+  public void testSaslTimeoutFailure() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    TimeoutException timeoutException = new TimeoutException();
+    SaslTimeoutException saslTimeoutException =
+        new SaslTimeoutException(timeoutException);
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        ImmutableMap.<String, Object>builder()
+            .put("b0", saslTimeoutException)
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRetryOnSaslTimeout() throws IOException, InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        // SaslTimeout will cause a retry. Since b0 fails, we will retry both.
+        ImmutableMap.<String, Object>builder()
+            .put("b0", new SaslTimeoutException(new TimeoutException()))
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("b0", block0)
+            .build()
+    );
+    configMap.put("spark.shuffle.sasl.enableRetries", "true");
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
+    verify(listener).getTransferType();
+    verifyNoMoreInteractions(listener);
+  }

Review Comment:
   Add a test where repeated sasl timeout failures results in failing the request - to make sure there are no regressions from current codebase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] akpatnam25 commented on pull request #38959: SPARK-41415: SASL Request Retries

Posted by GitBox <gi...@apache.org>.
akpatnam25 commented on PR #38959:
URL: https://github.com/apache/spark/pull/38959#issuecomment-1371548903

   @mridulm  updated the PR to not have protocol/server side changes. In this case, we are creating a new connection every time the SASL retry is triggered. Confirmed that this is the case by throwing some simulated exceptions to trigger SASL retries on our cluster.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org