You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by vanzin <gi...@git.apache.org> on 2018/01/22 22:04:34 UTC

[GitHub] spark pull request #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManage...

Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19671#discussion_r163081760
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -1322,33 +1322,55 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     
       test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") {
         val tryAgainMsg = "test_spark_20640_try_again"
    +    val timingoutExecutor = "timingoutExecutor"
    +    val tryAgainExecutor = "tryAgainExecutor"
    +    val succeedingExecutor = "succeedingExecutor"
    +
         // a server which delays response 50ms and must try twice for success.
         def newShuffleServer(port: Int): (TransportServer, Int) = {
    -      val attempts = new mutable.HashMap[String, Int]()
    +      val failure = new Exception(tryAgainMsg)
    +      val success = ByteBuffer.wrap(new Array[Byte](0))
    +
    +      var secondExecutorFailedOnce = false
    +      var thirdExecutorFailedOnce = false
    +
           val handler = new NoOpRpcHandler {
             override def receive(
                 client: TransportClient,
                 message: ByteBuffer,
                 callback: RpcResponseCallback): Unit = {
               val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
               msgObj match {
    -            case exec: RegisterExecutor =>
    -              Thread.sleep(50)
    -              val attempt = attempts.getOrElse(exec.execId, 0) + 1
    -              attempts(exec.execId) = attempt
    -              if (attempt < 2) {
    -                callback.onFailure(new Exception(tryAgainMsg))
    -                return
    -              }
    -              callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
    +
    +            case exec: RegisterExecutor if exec.execId == timingoutExecutor =>
    +              () // No reply to generate client-side timeout
    +
    +            case exec: RegisterExecutor
    +              if exec.execId == tryAgainExecutor && !secondExecutorFailedOnce =>
    +              secondExecutorFailedOnce = true
    +              callback.onFailure(failure)
    +
    +            case exec: RegisterExecutor if exec.execId == tryAgainExecutor =>
    +              callback.onSuccess(success)
    +
    +            case exec: RegisterExecutor
    +              if exec.execId == succeedingExecutor && !thirdExecutorFailedOnce =>
    +              thirdExecutorFailedOnce = true
    +              callback.onFailure(failure)
    +
    +            case exec: RegisterExecutor if exec.execId == succeedingExecutor =>
    +              callback.onSuccess(success)
    +
               }
             }
           }
     
    -      val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 0)
    +      val transConf: TransportConf =
    --- End diff --
    
    Change is not necessary?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org