You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Steve Loughran (JIRA)" <ji...@apache.org> on 2016/02/29 12:06:18 UTC

[jira] [Commented] (SPARK-10063) Remove DirectParquetOutputCommitter

    [ https://issues.apache.org/jira/browse/SPARK-10063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15171744#comment-15171744 ] 

Steve Loughran commented on SPARK-10063:
----------------------------------------

The fault is having speculation turned on, rather than the committer itself. Best to add a way for the system to detect that the output is going to an object store with potential consistency issues, and reject.

In HADOOP-9545 we've been considering an explicit object store API, one which uses PUT to write stuff, rather than pretend that the output stream is writing stuff and that {{close()}} is a low-cost, minimal side-effect operation.

> Remove DirectParquetOutputCommitter
> -----------------------------------
>
>                 Key: SPARK-10063
>                 URL: https://issues.apache.org/jira/browse/SPARK-10063
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>            Reporter: Yin Huai
>            Assignee: Yin Huai
>            Priority: Critical
>
> When we use DirectParquetOutputCommitter on S3 and speculation is enabled, there is a chance that we can loss data. 
> Here is the code to reproduce the problem.
> {code}
> import org.apache.spark.sql.functions._
> val failSpeculativeTask = sqlContext.udf.register("failSpeculativeTask", (i: Int, partitionId: Int, attemptNumber: Int) => {
>   if (partitionId == 0 && i == 5) {
>     if (attemptNumber > 0) {
>       Thread.sleep(15000)
>       throw new Exception("new exception")
>     } else {
>       Thread.sleep(10000)
>     }
>   }
>   
>   i
> })
> val df = sc.parallelize((1 to 100), 20).mapPartitions { iter =>
>   val context = org.apache.spark.TaskContext.get()
>   val partitionId = context.partitionId
>   val attemptNumber = context.attemptNumber
>   iter.map(i => (i, partitionId, attemptNumber))
> }.toDF("i", "partitionId", "attemptNumber")
> df
>   .select(failSpeculativeTask($"i", $"partitionId", $"attemptNumber").as("i"), $"partitionId", $"attemptNumber")
>   .write.mode("overwrite").format("parquet").save("/home/yin/outputCommitter")
> sqlContext.read.load("/home/yin/outputCommitter").count
> // The result is 99 and 5 is missing from the output.
> {code}
> What happened is that the original task finishes first and uploads its output file to S3, then the speculative task somehow fails. Because we have to call output stream's close method, which uploads data to S3, we actually uploads the partial result generated by the failed speculative task to S3 and this file overwrites the correct file generated by the original task.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org