You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by vanzin <gi...@git.apache.org> on 2018/01/22 22:04:34 UTC
[GitHub] spark pull request #19671: [SPARK-22297][CORE TESTS] Flaky test: BlockManage...
Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19671#discussion_r163081760
--- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1322,33 +1322,55 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") {
val tryAgainMsg = "test_spark_20640_try_again"
+ val timingoutExecutor = "timingoutExecutor"
+ val tryAgainExecutor = "tryAgainExecutor"
+ val succeedingExecutor = "succeedingExecutor"
+
// a server which delays response 50ms and must try twice for success.
def newShuffleServer(port: Int): (TransportServer, Int) = {
- val attempts = new mutable.HashMap[String, Int]()
+ val failure = new Exception(tryAgainMsg)
+ val success = ByteBuffer.wrap(new Array[Byte](0))
+
+ var secondExecutorFailedOnce = false
+ var thirdExecutorFailedOnce = false
+
val handler = new NoOpRpcHandler {
override def receive(
client: TransportClient,
message: ByteBuffer,
callback: RpcResponseCallback): Unit = {
val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
msgObj match {
- case exec: RegisterExecutor =>
- Thread.sleep(50)
- val attempt = attempts.getOrElse(exec.execId, 0) + 1
- attempts(exec.execId) = attempt
- if (attempt < 2) {
- callback.onFailure(new Exception(tryAgainMsg))
- return
- }
- callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
+
+ case exec: RegisterExecutor if exec.execId == timingoutExecutor =>
+ () // No reply to generate client-side timeout
+
+ case exec: RegisterExecutor
+ if exec.execId == tryAgainExecutor && !secondExecutorFailedOnce =>
+ secondExecutorFailedOnce = true
+ callback.onFailure(failure)
+
+ case exec: RegisterExecutor if exec.execId == tryAgainExecutor =>
+ callback.onSuccess(success)
+
+ case exec: RegisterExecutor
+ if exec.execId == succeedingExecutor && !thirdExecutorFailedOnce =>
+ thirdExecutorFailedOnce = true
+ callback.onFailure(failure)
+
+ case exec: RegisterExecutor if exec.execId == succeedingExecutor =>
+ callback.onSuccess(success)
+
}
}
}
- val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 0)
+ val transConf: TransportConf =
--- End diff --
Change is not necessary?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org