You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2022/10/27 09:58:00 UTC

[jira] [Commented] (SPARK-40932) Barrier: messages for allGather will be overridden by the following barrier APIs

    [ https://issues.apache.org/jira/browse/SPARK-40932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625002#comment-17625002 ] 

Apache Spark commented on SPARK-40932:
--------------------------------------

User 'wbo4958' has created a pull request for this issue:
https://github.com/apache/spark/pull/38410

> Barrier: messages for allGather will be overridden by the following barrier APIs
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-40932
>                 URL: https://issues.apache.org/jira/browse/SPARK-40932
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.3.0, 3.3.1
>            Reporter: Bobby Wang
>            Priority: Critical
>
> When I was working on an internal project which has not been opened source. I found this bug that the messages for Barrier.allGather may be overridden by the following Barrier APIs, which means the user can't get the correct allGather message.
>  
> This issue can easily repro by the following unit tests.
>  
>  
> {code:java}
> test("SPARK-XXX, messages of allGather should not been overridden " +
>   "by the following barrier APIs") {
>   sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local[2]"))
>   sc.setLogLevel("INFO")
>   val rdd = sc.makeRDD(1 to 10, 2)
>   val rdd2 = rdd.barrier().mapPartitions { it =>
>     val context = BarrierTaskContext.get()
>     // Sleep for a random time before global sync.
>     Thread.sleep(Random.nextInt(1000))
>     // Pass partitionId message in
>     val message: String = context.partitionId().toString
>     val messages: Array[String] = context.allGather(message)
>     context.barrier()
>     Iterator.single(messages.toList)
>   }
>   val messages = rdd2.collect()
>   // All the task partitionIds are shared across all tasks
>   assert(messages.length === 2)
>   messages.foreach(m => println("------- " + m))
>   assert(messages.forall(_ == List("0", "1")))
> } {code}
>  
>  
> before throwing the exception by (assert(messages.forall(_ == List("0", "1"))), the print log is 
>  
> {code:java}
> ------- List(, )
> ------- List(, ) {code}
>  
>  
> You can see, the messages are empty which has been overridden by context.barrier() API.
>  
> Below is the spark log,
>  
> _22/10/27 17:03:50.236 Executor task launch worker for task 0.0 in stage 0.0 (TID 1) INFO Executor: Running task 0.0 in stage 0.0 (TID 1)_
> _22/10/27 17:03:50.236 Executor task launch worker for task 1.0 in stage 0.0 (TID 0) INFO Executor: Running task 1.0 in stage 0.0 (TID 0)_
> _22/10/27 17:03:50.949 Executor task launch worker for task 0.0 in stage 0.0 (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) has entered the global sync, current barrier epoch is 0._
> _22/10/27 17:03:50.964 dispatcher-event-loop-1 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 0._
> _22/10/27 17:03:50.966 dispatcher-event-loop-1 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 0 (Attempt 0) received update from Task 1, current progress: 1/2._
> _22/10/27 17:03:51.436 Executor task launch worker for task 1.0 in stage 0.0 (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) has entered the global sync, current barrier epoch is 0._
> _22/10/27 17:03:51.437 dispatcher-event-loop-0 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 0._
> _22/10/27 17:03:51.437 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 0 (Attempt 0) received update from Task 0, current progress: 2/2._
> _22/10/27 17:03:51.440 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 0 (Attempt 0) received all updates from tasks, finished successfully._
> _22/10/27 17:03:51.958 Executor task launch worker for task 0.0 in stage 0.0 (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) finished global sync successfully, waited for 1 seconds, current barrier epoch is 1._
> _22/10/27 17:03:51.959 Executor task launch worker for task 0.0 in stage 0.0 (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) has entered the global sync, current barrier epoch is 1._
> _22/10/27 17:03:51.960 dispatcher-event-loop-1 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 1._
> _22/10/27 17:03:51.960 dispatcher-event-loop-1 INFO BarrierCoordinator: Barrier sync epoch 1 from Stage 0 (Attempt 0) received update from Task 1, current progress: 1/2._
> _22/10/27 17:03:52.437 Executor task launch worker for task 1.0 in stage 0.0 (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) finished global sync successfully, waited for 1 seconds, current barrier epoch is 1._
> _22/10/27 17:03:52.438 Executor task launch worker for task 1.0 in stage 0.0 (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) has entered the global sync, current barrier epoch is 1._
> _22/10/27 17:03:52.438 dispatcher-event-loop-0 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 1._
> _22/10/27 17:03:52.439 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier sync epoch 1 from Stage 0 (Attempt 0) received update from Task 0, current progress: 2/2._
> _22/10/27 17:03:52.439 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier sync epoch 1 from Stage 0 (Attempt 0) received all updates from tasks, finished successfully._
> _22/10/27 17:03:52.960 Executor task launch worker for task 0.0 in stage 0.0 (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) finished global sync successfully, waited for 1 seconds, current barrier epoch is 2._
> _22/10/27 17:03:52.972 Executor task launch worker for task 0.0 in stage 0.0 (TID 1) INFO Executor: Finished task 0.0 in stage 0.0 (TID 1). 1040 bytes result sent to driver_
> _22/10/27 17:03:52.974 dispatcher-event-loop-1 INFO TaskSchedulerImpl: Skip current round of resource offers for barrier stage 0 because the barrier taskSet requires 2 slots, while the total number of available slots is 1._
> _22/10/27 17:03:52.976 task-result-getter-0 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 1) in 2762 ms on 192.168.31.236 (executor driver) (1/2)_
> _22/10/27 17:03:53.439 Executor task launch worker for task 1.0 in stage 0.0 (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) finished global sync successfully, waited for 1 seconds, current barrier epoch is 2._
> _22/10/27 17:03:53.445 Executor task launch worker for task 1.0 in stage 0.0 (TID 0) INFO Executor: Finished task 1.0 in stage 0.0 (TID 0). 1040 bytes result sent to driver_
>  
> After debugging, I found the [object messages|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala#L102] (Array[String]) returning to BarrierTaskContext are the same as the [original messages|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala#L107]
>  
> I will file a PR for this issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org