You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Rowan Chattaway (JIRA)" <ji...@apache.org> on 2015/05/20 14:01:00 UTC

[jira] [Comment Edited] (SPARK-7755) MetadataCache.refresh does not take into account _SUCCESS

    [ https://issues.apache.org/jira/browse/SPARK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14552173#comment-14552173 ] 

Rowan Chattaway edited comment on SPARK-7755 at 5/20/15 12:00 PM:
------------------------------------------------------------------

These the the kinds of errors you can get:
java.lang.RuntimeException: hdfs://xxx:9002/xxx/xxx/xxx/Version=680/_temporary/0/_temporary/attempt_201505191723_0002_r_000001_0/part-r-00002.parquet is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [82, 106, 0, 0]
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:301)
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:300)
scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
.
.
.
	at scala.collection.parallel.package$$anon$1.alongWith(package.scala:85)
	at scala.collection.parallel.Task$class.mergeThrowables(Tasks.scala:86)
	at scala.collection.parallel.mutable.ParArray$Map.mergeThrowables(ParArray.scala:650)
	at scala.collection.parallel.Task$class.tryMerge(Tasks.scala:72)
	at scala.collection.parallel.mutable.ParArray$Map.tryMerge(ParArray.scala:650)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:190)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
	at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

These files were written down with a call to dataFrame.saveAsParquet() - on spark 1.3.1.
However, in this case, the process that was doing this was killed, leaving these _temporary files.

I take your point about _SUCCESS being optional.
Can you explain how DirectParquetOutputCommitter might address this?
If it doesn't, perhaps introducing a property that will check for the presence of _SUCCESS if set?

I think the default case for most people is that the _SUCCESS file is present and that you would expect these to be filtered out.
Therefore a property that defaults to true for _success check would be correct for most use cases.

Thoughts?


was (Author: rowan.chattaway@googlemail.com):
These the the kinds of errors you can get:
java.lang.RuntimeException: hdfs://xxx:9002/xxx/xxx/xxx/Version=680/_temporary/0/_temporary/attempt_201505191723_0002_r_000001_0/part-r-00002.parquet is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [82, 106, 0, 0]
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:301)
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:300)
scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
.
.
.
	at scala.collection.parallel.package$$anon$1.alongWith(package.scala:85)
	at scala.collection.parallel.Task$class.mergeThrowables(Tasks.scala:86)
	at scala.collection.parallel.mutable.ParArray$Map.mergeThrowables(ParArray.scala:650)
	at scala.collection.parallel.Task$class.tryMerge(Tasks.scala:72)
	at scala.collection.parallel.mutable.ParArray$Map.tryMerge(ParArray.scala:650)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:190)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
	at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

These files were written down with a call to dataFrame.saveAsParquet() - on spark 1.3.1.
However, in this case, the process that was doing this was killed, leaving these _temporary files.

I take your point about _SUCCESS being optional.
Can you explain how DirectParquetOutputCommitter might address this?
If it doesn't, perhaps introducing a property that will check for the presence of _SUCCESS if set?

Thoughts?

> MetadataCache.refresh does not take into account _SUCCESS
> ---------------------------------------------------------
>
>                 Key: SPARK-7755
>                 URL: https://issues.apache.org/jira/browse/SPARK-7755
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Rowan Chattaway
>            Priority: Minor
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> When you make a call to sqlc.parquetFile(path) where that path contains partially written files, then refresh will fail in strange ways when it attempts to read footer files.
> I would like to adjust the file discovery to take into account the presence of _SUCCESS and therefore only attempt to ready is we have the success marker.
> I have made the changes locally and it doesn't appear to have any side effects.
> What are peoples thoughts about this?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org