You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jason Xu (Jira)" <ji...@apache.org> on 2022/03/02 08:36:00 UTC

[jira] [Updated] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

     [ https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jason Xu updated SPARK-38388:
-----------------------------
    Description: 
Spark repartition uses RoundRobinPartitioning, the generated results is non-deterministic when data has some randomness and stage/task retries happen.

The bug can be triggered when upstream data has some randomness, a repartition is called on them, then followed by result stage (could be more stages).
As the pattern shows below:
upstream stage (data with randomness) -> (repartition shuffle) -> result stage

When one executor goes down at result stage, some tasks of that stage might have finished, others would fail, shuffle files on that executor also get lost, some tasks from previous stage (upstream data generation, repartition) will need to rerun to generate dependent shuffle data files.
Because data has some randomness, regenerated data in upstream retried tasks is slightly different, repartition then generates inconsistent ordering, then tasks at result stage will be retried generating different data.

This is similar but different to https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra local sort to make the row ordering deterministic, the sorting algorithm it uses simply compares row/record binaries. But in this case, upstream data has some randomness, the sorting algorithm doesn't help keep the order, thus RoundRobinPartitioning introduced non-deterministic result.

The following code returns 998818, instead of 1000000:
{code:java}
import scala.sys.process._
import org.apache.spark.TaskContext

case class TestObject(id: Long, value: Double)

val ds = spark.range(0, 1000 * 1000, 1).repartition(100, $"id").withColumn("val", rand()).repartition(100).map { 
  row => if (TaskContext.get.stageAttemptNumber == 0 && TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 98) {
    throw new Exception("pkill -f java".!!)
  }
  TestObject(row.getLong(0), row.getDouble(1))
}

ds.toDF("id", "value").write.saveAsTable("tmp.test_table")

spark.sql("select count(distinct id) from tmp.test_table").show{code}
Command: 
{code:java}
spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false --conf spark.shuffle.service.enabled=false){code}
To simulate the issue, disable external shuffle service is needed (if it's also enabled by default in your environment),  this is to trigger shuffle file loss and previous stage retries.
In our production, we have external shuffle service enabled, this data correctness issue happened when there were node losses.

Although there's some non-deterministic factor in upstream data, user wouldn't expect  to see incorrect result.

  was:
Spark repartition uses RoundRobinPartitioning, the generated results is non-deterministic when data has some randomness and stage/task retries happen.

The bug can be triggered when upstream data has some randomness, a repartition is called on them, then followed by result stage (could be more stages).
As the pattern shows below:
upstream stage (data with randomness) -> (repartition shuffle) -> result stage

When one executor goes down at result stage, some tasks of that stage might have finished, others would fail, shuffle files on that executor also get lost, some tasks from previous stage (upstream data generation, repartition) will need to rerun to generate dependent shuffle data files.
Because data has some randomness, regenerated data in upstream retried tasks is slightly different, repartition then generates inconsistent ordering, then tasks at result stage will be retried generating different data.

This is similar but different to https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra local sort to make the row ordering deterministic, the sorting algorithm it uses simply compares row/record binaries. But in this case, upstream data has some randomness, the sorting algorithm doesn't help keep the order, thus RoundRobinPartitioning introduced non-deterministic result.

The following code returns 998818, instead of 1000000:
{code:java}
import scala.sys.process._
import org.apache.spark.TaskContext

case class TestObject(id: Long, value: Double)

val ds = spark.range(0, 1000 * 1000, 1).repartition(100, $"id").withColumn("val", rand()).repartition(100).map { row =>
	if (TaskContext.get.stageAttemptNumber == 0 && TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 98)           {
    throw new Exception("pkill -f java".!!)
  }
  TestObject(row.getLong(0), row.getDouble(1))
}

ds.toDF("id", "value").write.saveAsTable("tmp.test_table")

spark.sql("select count(distinct id) from tmp.test_table").show{code}

Command: 
{code:java}
spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false --conf spark.shuffle.service.enabled=false){code}
To simulate the issue, disable external shuffle service is needed (if it's also enabled by default in your environment),  this is to trigger shuffle file loss and previous stage retries.
In our production, we have external shuffle service enabled, this data correctness issue happened when there were node losses.

Although there's some non-deterministic factor in upstream data, user wouldn't expect  to see incorrect result.


> Repartition + Stage retries could lead to incorrect data 
> ---------------------------------------------------------
>
>                 Key: SPARK-38388
>                 URL: https://issues.apache.org/jira/browse/SPARK-38388
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0, 3.1.1
>         Environment: Spark 2.4 and 3.1
>            Reporter: Jason Xu
>            Priority: Major
>
> Spark repartition uses RoundRobinPartitioning, the generated results is non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a repartition is called on them, then followed by result stage (could be more stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might have finished, others would fail, shuffle files on that executor also get lost, some tasks from previous stage (upstream data generation, repartition) will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks is slightly different, repartition then generates inconsistent ordering, then tasks at result stage will be retried generating different data.
> This is similar but different to https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra local sort to make the row ordering deterministic, the sorting algorithm it uses simply compares row/record binaries. But in this case, upstream data has some randomness, the sorting algorithm doesn't help keep the order, thus RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 998818, instead of 1000000:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 98) {
>     throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's also enabled by default in your environment),  this is to trigger shuffle file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org