You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Rowan Chattaway (JIRA)" <ji...@apache.org> on 2015/05/20 14:01:00 UTC
[jira] [Comment Edited] (SPARK-7755) MetadataCache.refresh does not
take into account _SUCCESS
[ https://issues.apache.org/jira/browse/SPARK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14552173#comment-14552173 ]
Rowan Chattaway edited comment on SPARK-7755 at 5/20/15 12:00 PM:
------------------------------------------------------------------
These the the kinds of errors you can get:
java.lang.RuntimeException: hdfs://xxx:9002/xxx/xxx/xxx/Version=680/_temporary/0/_temporary/attempt_201505191723_0002_r_000001_0/part-r-00002.parquet is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [82, 106, 0, 0]
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:301)
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:300)
scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
.
.
.
at scala.collection.parallel.package$$anon$1.alongWith(package.scala:85)
at scala.collection.parallel.Task$class.mergeThrowables(Tasks.scala:86)
at scala.collection.parallel.mutable.ParArray$Map.mergeThrowables(ParArray.scala:650)
at scala.collection.parallel.Task$class.tryMerge(Tasks.scala:72)
at scala.collection.parallel.mutable.ParArray$Map.tryMerge(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:190)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
These files were written down with a call to dataFrame.saveAsParquet() - on spark 1.3.1.
However, in this case, the process that was doing this was killed, leaving these _temporary files.
I take your point about _SUCCESS being optional.
Can you explain how DirectParquetOutputCommitter might address this?
If it doesn't, perhaps introducing a property that will check for the presence of _SUCCESS if set?
I think the default case for most people is that the _SUCCESS file is present and that you would expect these to be filtered out.
Therefore a property that defaults to true for _success check would be correct for most use cases.
Thoughts?
was (Author: rowan.chattaway@googlemail.com):
These the the kinds of errors you can get:
java.lang.RuntimeException: hdfs://xxx:9002/xxx/xxx/xxx/Version=680/_temporary/0/_temporary/attempt_201505191723_0002_r_000001_0/part-r-00002.parquet is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [82, 106, 0, 0]
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:301)
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:300)
scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
.
.
.
at scala.collection.parallel.package$$anon$1.alongWith(package.scala:85)
at scala.collection.parallel.Task$class.mergeThrowables(Tasks.scala:86)
at scala.collection.parallel.mutable.ParArray$Map.mergeThrowables(ParArray.scala:650)
at scala.collection.parallel.Task$class.tryMerge(Tasks.scala:72)
at scala.collection.parallel.mutable.ParArray$Map.tryMerge(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:190)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
These files were written down with a call to dataFrame.saveAsParquet() - on spark 1.3.1.
However, in this case, the process that was doing this was killed, leaving these _temporary files.
I take your point about _SUCCESS being optional.
Can you explain how DirectParquetOutputCommitter might address this?
If it doesn't, perhaps introducing a property that will check for the presence of _SUCCESS if set?
Thoughts?
> MetadataCache.refresh does not take into account _SUCCESS
> ---------------------------------------------------------
>
> Key: SPARK-7755
> URL: https://issues.apache.org/jira/browse/SPARK-7755
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Reporter: Rowan Chattaway
> Priority: Minor
> Original Estimate: 1h
> Remaining Estimate: 1h
>
> When you make a call to sqlc.parquetFile(path) where that path contains partially written files, then refresh will fail in strange ways when it attempts to read footer files.
> I would like to adjust the file discovery to take into account the presence of _SUCCESS and therefore only attempt to ready is we have the success marker.
> I have made the changes locally and it doesn't appear to have any side effects.
> What are peoples thoughts about this?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org