You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Nathan McCarthy (JIRA)" <ji...@apache.org> on 2015/06/17 10:44:00 UTC

[jira] [Commented] (SPARK-8406) Race condition when writing Parquet files

    [ https://issues.apache.org/jira/browse/SPARK-8406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14589498#comment-14589498 ] 

Nathan McCarthy commented on SPARK-8406:
----------------------------------------

This is hitting us hard. Let me know if there is anything we can do to help on this end with contributing a fix or testing. 

FYI heres details from the mailing list. 

 ---

When trying to save a data frame with 569610608 rows. 

  dfc.write.format("parquet").save(“/data/map_parquet_file")

We get random results between runs. Caching the data frame in memory makes no difference. It looks like the write out misses some of the RDD partitions. We have an RDD with 6750 partitions. When we write out we get less files out than the number of partitions. When reading the data back in and running a count, we get smaller number of rows. 

I’ve tried counting the rows in all different ways. All return the same result, 560214031 rows, missing about 9.4 million rows (0.15%).

  qc.read.parquet("/data/map_parquet_file").count
  qc.read.parquet("/data/map_parquet_file").rdd.count
  qc.read.parquet("/data/map_parquet_file").mapPartitions{itr => var c = 0; itr.foreach(_ => c = c + 1); Seq(c).toIterator }.reduce(_ + _)

Looking on HDFS the files, there are 6643 .parquet files. 107 missing partitions (about 0.15%). 

Then writing out the same cached DF again to a new file gives 6717 files on hdfs (about 33 files missing or 0.5%);

  dfc.write.parquet(“/data/map_parquet_file_2")

And we get 566670107 rows back (about 3million missing ~0.5%); 

  qc.read.parquet("/data/map_parquet_file_2").count

Writing the same df out to json writes the expected number (6750) of parquet files and returns the right number of rows 569610608. 

  dfc.write.format("json").save("/data/map_parquet_file_3")
  qc.read.format("json").load("/data/map_parquet_file_3").count

One thing to note is that the parquet part files on HDFS are not the normal sequential part numbers like for the json output and parquet output in Spark 1.3.

part-r-06151.gz.parquet  part-r-118401.gz.parquet  part-r-146249.gz.parquet  part-r-196755.gz.parquet  part-r-35811.gz.parquet   part-r-55628.gz.parquet  part-r-73497.gz.parquet  part-r-97237.gz.parquet
part-r-06161.gz.parquet  part-r-118406.gz.parquet  part-r-146254.gz.parquet  part-r-196763.gz.parquet  part-r-35826.gz.parquet   part-r-55647.gz.parquet  part-r-73500.gz.parquet  _SUCCESS

We are using MapR 4.0.2 for hdfs.

> Race condition when writing Parquet files
> -----------------------------------------
>
>                 Key: SPARK-8406
>                 URL: https://issues.apache.org/jira/browse/SPARK-8406
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.4.0
>            Reporter: Cheng Lian
>            Assignee: Cheng Lian
>            Priority: Blocker
>
> To support appending, the Parquet data source tries to find out the max ID of part-files in the destination directory (the <id> in output file name "part-r-<id>.gz.parquet") at the beginning of the write job. In 1.3.0, this step happens on driver side before any files are written. However, in 1.4.0, this is moved to task side. Thus, for tasks scheduled later, they may see wrong max ID generated by newly written files by other finished tasks within the same job. This actually causes a race condition. In most cases, this only causes nonconsecutive IDs in output file names. But when the DataFrame contains thousands of RDD partitions, it's likely that two tasks may choose the same ID, thus one of them gets overwritten by the other.
> The data loss situation is not quite easy to reproduce. But the following Spark shell snippet can reproduce nonconsecutive output file IDs:
> {code}
> sqlContext.range(0, 128).repartition(16).write.mode("overwrite").parquet("foo")
> {code}
> "16" can be replaced with any integer that is greater than the default parallelism on your machine (usually it means core number, on my machine it's 8).
> {noformat}
> -rw-r--r--   3 lian supergroup          0 2015-06-17 00:06 /user/lian/foo/_SUCCESS
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00001.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00002.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00003.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00004.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00005.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00006.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00007.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00008.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00017.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00018.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00019.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00020.gz.parquet
> -rw-r--r--   3 lian supergroup        352 2015-06-17 00:06 /user/lian/foo/part-r-00021.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00022.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00023.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00024.gz.parquet
> {noformat}
> Notice that the newly added ORC data source doesn't suffer this issue because it uses both task ID and {{System.currentTimeMills()}} to generate the output file name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org