You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by mpetruska <gi...@git.apache.org> on 2017/11/06 12:48:28 UTC

[GitHub] spark pull request #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManage...

GitHub user mpetruska opened a pull request:

    https://github.com/apache/spark/pull/19671

    [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite "Shuffle registration timeout and maxAttempts conf"

    ## What changes were proposed in this pull request?
    
    [Ticket](https://issues.apache.org/jira/browse/SPARK-22297)
    - one of the tests seems to produce unreliable results due to execution speed variability
    
    ## How was this patch tested?
    
    The test's check cases remain the same and the set-up emulates the previous version's.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mpetruska/spark SPARK-22297

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19671.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19671
    
----
commit 6da1d348ee5a14cb4cb34d6c2ca8be824aab5264
Author: Mark Petruska <pe...@gmail.com>
Date:   2017-11-06T12:43:42Z

    fixes test case 'flakiness'

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManage...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/19671


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    Merging to master.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    **[Test build #83937 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83937/testReport)** for PR 19671 at commit [`0532cc3`](https://github.com/apache/spark/commit/0532cc30e9ae2ab72b278da98cecf956f5a46013).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    I think we should run the test for `executor2` for multiple times to avoid the test case being flaky. The test case is flaky mainly because we may lose the `RegisterExecutor` message, which is not related to the Thead.sleep() issue.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    **[Test build #83920 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83920/testReport)** for PR 19671 at commit [`6da1d34`](https://github.com/apache/spark/commit/6da1d348ee5a14cb4cb34d6c2ca8be824aab5264).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManage...

Posted by mpetruska <gi...@git.apache.org>.
Github user mpetruska commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19671#discussion_r163298390
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -1322,33 +1322,55 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     
       test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") {
         val tryAgainMsg = "test_spark_20640_try_again"
    +    val timingoutExecutor = "timingoutExecutor"
    +    val tryAgainExecutor = "tryAgainExecutor"
    +    val succeedingExecutor = "succeedingExecutor"
    +
         // a server which delays response 50ms and must try twice for success.
         def newShuffleServer(port: Int): (TransportServer, Int) = {
    -      val attempts = new mutable.HashMap[String, Int]()
    +      val failure = new Exception(tryAgainMsg)
    +      val success = ByteBuffer.wrap(new Array[Byte](0))
    +
    +      var secondExecutorFailedOnce = false
    +      var thirdExecutorFailedOnce = false
    +
           val handler = new NoOpRpcHandler {
             override def receive(
                 client: TransportClient,
                 message: ByteBuffer,
                 callback: RpcResponseCallback): Unit = {
               val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
               msgObj match {
    -            case exec: RegisterExecutor =>
    -              Thread.sleep(50)
    -              val attempt = attempts.getOrElse(exec.execId, 0) + 1
    -              attempts(exec.execId) = attempt
    -              if (attempt < 2) {
    -                callback.onFailure(new Exception(tryAgainMsg))
    -                return
    -              }
    -              callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
    +
    +            case exec: RegisterExecutor if exec.execId == timingoutExecutor =>
    +              () // No reply to generate client-side timeout
    +
    +            case exec: RegisterExecutor
    +              if exec.execId == tryAgainExecutor && !secondExecutorFailedOnce =>
    +              secondExecutorFailedOnce = true
    +              callback.onFailure(failure)
    +
    +            case exec: RegisterExecutor if exec.execId == tryAgainExecutor =>
    +              callback.onSuccess(success)
    --- End diff --
    
    This is to demonstrate that the client only tries connection once, then fails.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManage...

Posted by mpetruska <gi...@git.apache.org>.
Github user mpetruska commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19671#discussion_r163297550
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -1322,33 +1322,55 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     
       test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") {
         val tryAgainMsg = "test_spark_20640_try_again"
    +    val timingoutExecutor = "timingoutExecutor"
    +    val tryAgainExecutor = "tryAgainExecutor"
    +    val succeedingExecutor = "succeedingExecutor"
    +
         // a server which delays response 50ms and must try twice for success.
         def newShuffleServer(port: Int): (TransportServer, Int) = {
    -      val attempts = new mutable.HashMap[String, Int]()
    +      val failure = new Exception(tryAgainMsg)
    +      val success = ByteBuffer.wrap(new Array[Byte](0))
    +
    +      var secondExecutorFailedOnce = false
    +      var thirdExecutorFailedOnce = false
    +
           val handler = new NoOpRpcHandler {
             override def receive(
                 client: TransportClient,
                 message: ByteBuffer,
                 callback: RpcResponseCallback): Unit = {
               val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
               msgObj match {
    -            case exec: RegisterExecutor =>
    -              Thread.sleep(50)
    -              val attempt = attempts.getOrElse(exec.execId, 0) + 1
    -              attempts(exec.execId) = attempt
    -              if (attempt < 2) {
    -                callback.onFailure(new Exception(tryAgainMsg))
    -                return
    -              }
    -              callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
    +
    +            case exec: RegisterExecutor if exec.execId == timingoutExecutor =>
    +              () // No reply to generate client-side timeout
    +
    +            case exec: RegisterExecutor
    +              if exec.execId == tryAgainExecutor && !secondExecutorFailedOnce =>
    +              secondExecutorFailedOnce = true
    +              callback.onFailure(failure)
    +
    +            case exec: RegisterExecutor if exec.execId == tryAgainExecutor =>
    +              callback.onSuccess(success)
    +
    +            case exec: RegisterExecutor
    +              if exec.execId == succeedingExecutor && !thirdExecutorFailedOnce =>
    +              thirdExecutorFailedOnce = true
    +              callback.onFailure(failure)
    +
    +            case exec: RegisterExecutor if exec.execId == succeedingExecutor =>
    +              callback.onSuccess(success)
    +
               }
             }
           }
     
    -      val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 0)
    +      val transConf: TransportConf =
    --- End diff --
    
    Not really, although IntelliJ Idea recommends adding the type annotation. Do you think I should delete it?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManage...

Posted by mpetruska <gi...@git.apache.org>.
Github user mpetruska commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19671#discussion_r151371494
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -1322,33 +1322,55 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     
       test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") {
         val tryAgainMsg = "test_spark_20640_try_again"
    +    val `timingoutExecutor` = "timingoutExecutor"
    --- End diff --
    
    No real reason, just a mistake made when renamed the value.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    **[Test build #83937 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83937/testReport)** for PR 19671 at commit [`0532cc3`](https://github.com/apache/spark/commit/0532cc30e9ae2ab72b278da98cecf956f5a46013).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    **[Test build #83920 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83920/testReport)** for PR 19671 at commit [`6da1d34`](https://github.com/apache/spark/commit/6da1d348ee5a14cb4cb34d6c2ca8be824aab5264).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManage...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19671#discussion_r163331741
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -1322,33 +1322,55 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     
       test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") {
         val tryAgainMsg = "test_spark_20640_try_again"
    +    val timingoutExecutor = "timingoutExecutor"
    +    val tryAgainExecutor = "tryAgainExecutor"
    +    val succeedingExecutor = "succeedingExecutor"
    +
         // a server which delays response 50ms and must try twice for success.
         def newShuffleServer(port: Int): (TransportServer, Int) = {
    -      val attempts = new mutable.HashMap[String, Int]()
    +      val failure = new Exception(tryAgainMsg)
    +      val success = ByteBuffer.wrap(new Array[Byte](0))
    +
    +      var secondExecutorFailedOnce = false
    +      var thirdExecutorFailedOnce = false
    +
           val handler = new NoOpRpcHandler {
             override def receive(
                 client: TransportClient,
                 message: ByteBuffer,
                 callback: RpcResponseCallback): Unit = {
               val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
               msgObj match {
    -            case exec: RegisterExecutor =>
    -              Thread.sleep(50)
    -              val attempt = attempts.getOrElse(exec.execId, 0) + 1
    -              attempts(exec.execId) = attempt
    -              if (attempt < 2) {
    -                callback.onFailure(new Exception(tryAgainMsg))
    -                return
    -              }
    -              callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
    +
    +            case exec: RegisterExecutor if exec.execId == timingoutExecutor =>
    +              () // No reply to generate client-side timeout
    +
    +            case exec: RegisterExecutor
    +              if exec.execId == tryAgainExecutor && !secondExecutorFailedOnce =>
    +              secondExecutorFailedOnce = true
    +              callback.onFailure(failure)
    +
    +            case exec: RegisterExecutor if exec.execId == tryAgainExecutor =>
    +              callback.onSuccess(success)
    --- End diff --
    
    I mean that this branch shouldn't even be reached by the test, right? Since `tryAgainExecutor` uses `conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1")` which is handled above.
    
    Not a big deal though.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by mpetruska <gi...@git.apache.org>.
Github user mpetruska commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    Jenkins, retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    **[Test build #83913 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83913/testReport)** for PR 19671 at commit [`6da1d34`](https://github.com/apache/spark/commit/6da1d348ee5a14cb4cb34d6c2ca8be824aab5264).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    @vanzin How often does this test case fail on your local environment? Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManage...

Posted by mpetruska <gi...@git.apache.org>.
Github user mpetruska commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19671#discussion_r163476983
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -1322,33 +1322,55 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     
       test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") {
         val tryAgainMsg = "test_spark_20640_try_again"
    +    val timingoutExecutor = "timingoutExecutor"
    +    val tryAgainExecutor = "tryAgainExecutor"
    +    val succeedingExecutor = "succeedingExecutor"
    +
         // a server which delays response 50ms and must try twice for success.
         def newShuffleServer(port: Int): (TransportServer, Int) = {
    -      val attempts = new mutable.HashMap[String, Int]()
    +      val failure = new Exception(tryAgainMsg)
    +      val success = ByteBuffer.wrap(new Array[Byte](0))
    +
    +      var secondExecutorFailedOnce = false
    +      var thirdExecutorFailedOnce = false
    +
           val handler = new NoOpRpcHandler {
             override def receive(
                 client: TransportClient,
                 message: ByteBuffer,
                 callback: RpcResponseCallback): Unit = {
               val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
               msgObj match {
    -            case exec: RegisterExecutor =>
    -              Thread.sleep(50)
    -              val attempt = attempts.getOrElse(exec.execId, 0) + 1
    -              attempts(exec.execId) = attempt
    -              if (attempt < 2) {
    -                callback.onFailure(new Exception(tryAgainMsg))
    -                return
    -              }
    -              callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
    +
    +            case exec: RegisterExecutor if exec.execId == timingoutExecutor =>
    +              () // No reply to generate client-side timeout
    +
    +            case exec: RegisterExecutor
    +              if exec.execId == tryAgainExecutor && !secondExecutorFailedOnce =>
    +              secondExecutorFailedOnce = true
    +              callback.onFailure(failure)
    +
    +            case exec: RegisterExecutor if exec.execId == tryAgainExecutor =>
    +              callback.onSuccess(success)
    --- End diff --
    
    Yes, that's the point, execution should not progress here, but since it's executed on a different thread (in an actor receive), the best solution is to report back to the client "thread", which would cause a test failure.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83913/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86579/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManage...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19671#discussion_r163082224
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -1322,33 +1322,55 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     
       test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") {
         val tryAgainMsg = "test_spark_20640_try_again"
    +    val timingoutExecutor = "timingoutExecutor"
    +    val tryAgainExecutor = "tryAgainExecutor"
    +    val succeedingExecutor = "succeedingExecutor"
    +
         // a server which delays response 50ms and must try twice for success.
         def newShuffleServer(port: Int): (TransportServer, Int) = {
    -      val attempts = new mutable.HashMap[String, Int]()
    +      val failure = new Exception(tryAgainMsg)
    +      val success = ByteBuffer.wrap(new Array[Byte](0))
    +
    +      var secondExecutorFailedOnce = false
    +      var thirdExecutorFailedOnce = false
    +
           val handler = new NoOpRpcHandler {
             override def receive(
                 client: TransportClient,
                 message: ByteBuffer,
                 callback: RpcResponseCallback): Unit = {
               val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
               msgObj match {
    -            case exec: RegisterExecutor =>
    -              Thread.sleep(50)
    -              val attempt = attempts.getOrElse(exec.execId, 0) + 1
    -              attempts(exec.execId) = attempt
    -              if (attempt < 2) {
    -                callback.onFailure(new Exception(tryAgainMsg))
    -                return
    -              }
    -              callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
    +
    +            case exec: RegisterExecutor if exec.execId == timingoutExecutor =>
    +              () // No reply to generate client-side timeout
    +
    +            case exec: RegisterExecutor
    +              if exec.execId == tryAgainExecutor && !secondExecutorFailedOnce =>
    +              secondExecutorFailedOnce = true
    +              callback.onFailure(failure)
    +
    +            case exec: RegisterExecutor if exec.execId == tryAgainExecutor =>
    +              callback.onSuccess(success)
    --- End diff --
    
    This should be an error, right? Because the test is setting max attempts at 1 for this executor.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    **[Test build #86579 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86579/testReport)** for PR 19671 at commit [`5319ae3`](https://github.com/apache/spark/commit/5319ae31f6fbba0b5ffe92ff934e170966fbe470).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by mpetruska <gi...@git.apache.org>.
Github user mpetruska commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    @jiangxb1987 : Well, I ran the `BlockManagerSuite` with `testOnly` and did not encounter any failures, passed 10 out of 10. Then, as a benchmark, I checked out the version without my changes and run the tests the same way, I got the same results 10 passed out of 10.
    
    I have these theories on the matter:
    - some other change fixed the issue
    - test failures only manifest when running multiple other tests in parallel
    - there's some setting/hardware on my machine that makes it very unlikely for the test to fail
    
    So, it turns out that unfortunately I cannot validate on my machine that removing the `Thread.sleep`s solves the flakiness issue.
    
    Can you please try running the tests on your machine with and without the proposed changes?
    (`4bacddb602e19fcd4e1ec75a7b10bed524e6989a` vs `SPARK-22297`)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManage...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19671#discussion_r163323152
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -1322,33 +1322,55 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     
       test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") {
         val tryAgainMsg = "test_spark_20640_try_again"
    +    val timingoutExecutor = "timingoutExecutor"
    +    val tryAgainExecutor = "tryAgainExecutor"
    +    val succeedingExecutor = "succeedingExecutor"
    +
         // a server which delays response 50ms and must try twice for success.
         def newShuffleServer(port: Int): (TransportServer, Int) = {
    -      val attempts = new mutable.HashMap[String, Int]()
    +      val failure = new Exception(tryAgainMsg)
    +      val success = ByteBuffer.wrap(new Array[Byte](0))
    +
    +      var secondExecutorFailedOnce = false
    +      var thirdExecutorFailedOnce = false
    +
           val handler = new NoOpRpcHandler {
             override def receive(
                 client: TransportClient,
                 message: ByteBuffer,
                 callback: RpcResponseCallback): Unit = {
               val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
               msgObj match {
    -            case exec: RegisterExecutor =>
    -              Thread.sleep(50)
    -              val attempt = attempts.getOrElse(exec.execId, 0) + 1
    -              attempts(exec.execId) = attempt
    -              if (attempt < 2) {
    -                callback.onFailure(new Exception(tryAgainMsg))
    -                return
    -              }
    -              callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
    +
    +            case exec: RegisterExecutor if exec.execId == timingoutExecutor =>
    +              () // No reply to generate client-side timeout
    +
    +            case exec: RegisterExecutor
    +              if exec.execId == tryAgainExecutor && !secondExecutorFailedOnce =>
    +              secondExecutorFailedOnce = true
    +              callback.onFailure(failure)
    +
    +            case exec: RegisterExecutor if exec.execId == tryAgainExecutor =>
    +              callback.onSuccess(success)
    +
    +            case exec: RegisterExecutor
    +              if exec.execId == succeedingExecutor && !thirdExecutorFailedOnce =>
    +              thirdExecutorFailedOnce = true
    +              callback.onFailure(failure)
    +
    +            case exec: RegisterExecutor if exec.execId == succeedingExecutor =>
    +              callback.onSuccess(success)
    +
               }
             }
           }
     
    -      val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 0)
    +      val transConf: TransportConf =
    --- End diff --
    
    Yes, since there's not really any change to this line. We omit the type of local variables in general.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    **[Test build #86579 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86579/testReport)** for PR 19671 at commit [`5319ae3`](https://github.com/apache/spark/commit/5319ae31f6fbba0b5ffe92ff934e170966fbe470).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83937/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    **[Test build #86587 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86587/testReport)** for PR 19671 at commit [`5319ae3`](https://github.com/apache/spark/commit/5319ae31f6fbba0b5ffe92ff934e170966fbe470).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    **[Test build #83913 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83913/testReport)** for PR 19671 at commit [`6da1d34`](https://github.com/apache/spark/commit/6da1d348ee5a14cb4cb34d6c2ca8be824aab5264).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManage...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19671#discussion_r163631100
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -1322,33 +1322,55 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     
       test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") {
         val tryAgainMsg = "test_spark_20640_try_again"
    +    val timingoutExecutor = "timingoutExecutor"
    +    val tryAgainExecutor = "tryAgainExecutor"
    +    val succeedingExecutor = "succeedingExecutor"
    +
         // a server which delays response 50ms and must try twice for success.
         def newShuffleServer(port: Int): (TransportServer, Int) = {
    -      val attempts = new mutable.HashMap[String, Int]()
    +      val failure = new Exception(tryAgainMsg)
    +      val success = ByteBuffer.wrap(new Array[Byte](0))
    +
    +      var secondExecutorFailedOnce = false
    +      var thirdExecutorFailedOnce = false
    +
           val handler = new NoOpRpcHandler {
             override def receive(
                 client: TransportClient,
                 message: ByteBuffer,
                 callback: RpcResponseCallback): Unit = {
               val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
               msgObj match {
    -            case exec: RegisterExecutor =>
    -              Thread.sleep(50)
    -              val attempt = attempts.getOrElse(exec.execId, 0) + 1
    -              attempts(exec.execId) = attempt
    -              if (attempt < 2) {
    -                callback.onFailure(new Exception(tryAgainMsg))
    -                return
    -              }
    -              callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
    +
    +            case exec: RegisterExecutor if exec.execId == timingoutExecutor =>
    +              () // No reply to generate client-side timeout
    +
    +            case exec: RegisterExecutor
    +              if exec.execId == tryAgainExecutor && !secondExecutorFailedOnce =>
    +              secondExecutorFailedOnce = true
    +              callback.onFailure(failure)
    +
    +            case exec: RegisterExecutor if exec.execId == tryAgainExecutor =>
    +              callback.onSuccess(success)
    --- End diff --
    
    It's fine. At worst it's redundant code.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManage...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19671#discussion_r163081760
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -1322,33 +1322,55 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     
       test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") {
         val tryAgainMsg = "test_spark_20640_try_again"
    +    val timingoutExecutor = "timingoutExecutor"
    +    val tryAgainExecutor = "tryAgainExecutor"
    +    val succeedingExecutor = "succeedingExecutor"
    +
         // a server which delays response 50ms and must try twice for success.
         def newShuffleServer(port: Int): (TransportServer, Int) = {
    -      val attempts = new mutable.HashMap[String, Int]()
    +      val failure = new Exception(tryAgainMsg)
    +      val success = ByteBuffer.wrap(new Array[Byte](0))
    +
    +      var secondExecutorFailedOnce = false
    +      var thirdExecutorFailedOnce = false
    +
           val handler = new NoOpRpcHandler {
             override def receive(
                 client: TransportClient,
                 message: ByteBuffer,
                 callback: RpcResponseCallback): Unit = {
               val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
               msgObj match {
    -            case exec: RegisterExecutor =>
    -              Thread.sleep(50)
    -              val attempt = attempts.getOrElse(exec.execId, 0) + 1
    -              attempts(exec.execId) = attempt
    -              if (attempt < 2) {
    -                callback.onFailure(new Exception(tryAgainMsg))
    -                return
    -              }
    -              callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
    +
    +            case exec: RegisterExecutor if exec.execId == timingoutExecutor =>
    +              () // No reply to generate client-side timeout
    +
    +            case exec: RegisterExecutor
    +              if exec.execId == tryAgainExecutor && !secondExecutorFailedOnce =>
    +              secondExecutorFailedOnce = true
    +              callback.onFailure(failure)
    +
    +            case exec: RegisterExecutor if exec.execId == tryAgainExecutor =>
    +              callback.onSuccess(success)
    +
    +            case exec: RegisterExecutor
    +              if exec.execId == succeedingExecutor && !thirdExecutorFailedOnce =>
    +              thirdExecutorFailedOnce = true
    +              callback.onFailure(failure)
    +
    +            case exec: RegisterExecutor if exec.execId == succeedingExecutor =>
    +              callback.onSuccess(success)
    +
               }
             }
           }
     
    -      val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 0)
    +      val transConf: TransportConf =
    --- End diff --
    
    Change is not necessary?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManage...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19671#discussion_r151253919
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -1322,33 +1322,55 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     
       test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") {
         val tryAgainMsg = "test_spark_20640_try_again"
    +    val `timingoutExecutor` = "timingoutExecutor"
    --- End diff --
    
    Why the backticks?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83920/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    It's not a question of how often, it's that it fails. That's the definition of flaky.
    
    It obviously doesn't fail all the time, which is why it's a minor bug.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by mpetruska <gi...@git.apache.org>.
Github user mpetruska commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/191/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    **[Test build #86587 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86587/testReport)** for PR 19671 at commit [`5319ae3`](https://github.com/apache/spark/commit/5319ae31f6fbba0b5ffe92ff934e170966fbe470).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86587/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/185/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by mpetruska <gi...@git.apache.org>.
Github user mpetruska commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    Jenkins, retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManagerSuite ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19671
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManage...

Posted by mpetruska <gi...@git.apache.org>.
Github user mpetruska commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19671#discussion_r163477899
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -1322,33 +1322,55 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     
       test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") {
         val tryAgainMsg = "test_spark_20640_try_again"
    +    val timingoutExecutor = "timingoutExecutor"
    +    val tryAgainExecutor = "tryAgainExecutor"
    +    val succeedingExecutor = "succeedingExecutor"
    +
         // a server which delays response 50ms and must try twice for success.
         def newShuffleServer(port: Int): (TransportServer, Int) = {
    -      val attempts = new mutable.HashMap[String, Int]()
    +      val failure = new Exception(tryAgainMsg)
    +      val success = ByteBuffer.wrap(new Array[Byte](0))
    +
    +      var secondExecutorFailedOnce = false
    +      var thirdExecutorFailedOnce = false
    +
           val handler = new NoOpRpcHandler {
             override def receive(
                 client: TransportClient,
                 message: ByteBuffer,
                 callback: RpcResponseCallback): Unit = {
               val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
               msgObj match {
    -            case exec: RegisterExecutor =>
    -              Thread.sleep(50)
    -              val attempt = attempts.getOrElse(exec.execId, 0) + 1
    -              attempts(exec.execId) = attempt
    -              if (attempt < 2) {
    -                callback.onFailure(new Exception(tryAgainMsg))
    -                return
    -              }
    -              callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
    +
    +            case exec: RegisterExecutor if exec.execId == timingoutExecutor =>
    +              () // No reply to generate client-side timeout
    +
    +            case exec: RegisterExecutor
    +              if exec.execId == tryAgainExecutor && !secondExecutorFailedOnce =>
    +              secondExecutorFailedOnce = true
    +              callback.onFailure(failure)
    +
    +            case exec: RegisterExecutor if exec.execId == tryAgainExecutor =>
    +              callback.onSuccess(success)
    --- End diff --
    
    Of course, I can remove/modify the case, if you'd like...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org