You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bobby Wang (Jira)" <ji...@apache.org> on 2022/10/27 09:25:00 UTC

[jira] [Created] (SPARK-40932) Barrier: messages for allGather will be overridden by the following barrier APIs

Bobby Wang created SPARK-40932:
----------------------------------

             Summary: Barrier: messages for allGather will be overridden by the following barrier APIs
                 Key: SPARK-40932
                 URL: https://issues.apache.org/jira/browse/SPARK-40932
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.3.1, 3.3.0
            Reporter: Bobby Wang


When I was working on an internal project which has not been opened source. I found this bug that the messages for Barrier.allGather may be overridden by the following Barrier APIs, which means the user can't get the correct allGather message.

 

This issue can easily repro by the following unit tests.

 

 
{code:java}
test("SPARK-XXX, messages of allGather should not been overridden " +
  "by the following barrier APIs") {

  sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local[2]"))
  sc.setLogLevel("INFO")
  val rdd = sc.makeRDD(1 to 10, 2)
  val rdd2 = rdd.barrier().mapPartitions { it =>
    val context = BarrierTaskContext.get()
    // Sleep for a random time before global sync.
    Thread.sleep(Random.nextInt(1000))
    // Pass partitionId message in
    val message: String = context.partitionId().toString
    val messages: Array[String] = context.allGather(message)
    context.barrier()
    Iterator.single(messages.toList)
  }
  val messages = rdd2.collect()
  // All the task partitionIds are shared across all tasks
  assert(messages.length === 2)
  messages.foreach(m => println("------- " + m))
  assert(messages.forall(_ == List("0", "1")))
} {code}
 

 

before throwing the exception by (assert(messages.forall(_ == List("0", "1"))), the print log is 

 
{code:java}
------- List(, )
------- List(, ) {code}
 

 

You can see, the messages are empty which has been overridden by context.barrier() API.

 

Below is the spark log,

 

_22/10/27 17:03:50.236 Executor task launch worker for task 0.0 in stage 0.0 (TID 1) INFO Executor: Running task 0.0 in stage 0.0 (TID 1)_
_22/10/27 17:03:50.236 Executor task launch worker for task 1.0 in stage 0.0 (TID 0) INFO Executor: Running task 1.0 in stage 0.0 (TID 0)_
_22/10/27 17:03:50.949 Executor task launch worker for task 0.0 in stage 0.0 (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) has entered the global sync, current barrier epoch is 0._
_22/10/27 17:03:50.964 dispatcher-event-loop-1 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 0._
_22/10/27 17:03:50.966 dispatcher-event-loop-1 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 0 (Attempt 0) received update from Task 1, current progress: 1/2._
_22/10/27 17:03:51.436 Executor task launch worker for task 1.0 in stage 0.0 (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) has entered the global sync, current barrier epoch is 0._
_22/10/27 17:03:51.437 dispatcher-event-loop-0 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 0._
_22/10/27 17:03:51.437 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 0 (Attempt 0) received update from Task 0, current progress: 2/2._
_22/10/27 17:03:51.440 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 0 (Attempt 0) received all updates from tasks, finished successfully._
_22/10/27 17:03:51.958 Executor task launch worker for task 0.0 in stage 0.0 (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) finished global sync successfully, waited for 1 seconds, current barrier epoch is 1._
_22/10/27 17:03:51.959 Executor task launch worker for task 0.0 in stage 0.0 (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) has entered the global sync, current barrier epoch is 1._
_22/10/27 17:03:51.960 dispatcher-event-loop-1 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 1._
_22/10/27 17:03:51.960 dispatcher-event-loop-1 INFO BarrierCoordinator: Barrier sync epoch 1 from Stage 0 (Attempt 0) received update from Task 1, current progress: 1/2._
_22/10/27 17:03:52.437 Executor task launch worker for task 1.0 in stage 0.0 (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) finished global sync successfully, waited for 1 seconds, current barrier epoch is 1._
_22/10/27 17:03:52.438 Executor task launch worker for task 1.0 in stage 0.0 (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) has entered the global sync, current barrier epoch is 1._
_22/10/27 17:03:52.438 dispatcher-event-loop-0 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 1._
_22/10/27 17:03:52.439 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier sync epoch 1 from Stage 0 (Attempt 0) received update from Task 0, current progress: 2/2._
_22/10/27 17:03:52.439 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier sync epoch 1 from Stage 0 (Attempt 0) received all updates from tasks, finished successfully._
_22/10/27 17:03:52.960 Executor task launch worker for task 0.0 in stage 0.0 (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) finished global sync successfully, waited for 1 seconds, current barrier epoch is 2._
_22/10/27 17:03:52.972 Executor task launch worker for task 0.0 in stage 0.0 (TID 1) INFO Executor: Finished task 0.0 in stage 0.0 (TID 1). 1040 bytes result sent to driver_
_22/10/27 17:03:52.974 dispatcher-event-loop-1 INFO TaskSchedulerImpl: Skip current round of resource offers for barrier stage 0 because the barrier taskSet requires 2 slots, while the total number of available slots is 1._
_22/10/27 17:03:52.976 task-result-getter-0 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 1) in 2762 ms on 192.168.31.236 (executor driver) (1/2)_
_22/10/27 17:03:53.439 Executor task launch worker for task 1.0 in stage 0.0 (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) finished global sync successfully, waited for 1 seconds, current barrier epoch is 2._
_22/10/27 17:03:53.445 Executor task launch worker for task 1.0 in stage 0.0 (TID 0) INFO Executor: Finished task 1.0 in stage 0.0 (TID 0). 1040 bytes result sent to driver_

 

After debugging, I found the [object messages|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala#L102] (Array[String]) returning to BarrierTaskContext are the same as the [original message|[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala#L107]]

 

I will file a PR for this issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org