You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:16:54 UTC

[jira] [Resolved] (SPARK-18650) race condition in FileScanRDD.scala

     [ https://issues.apache.org/jira/browse/SPARK-18650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-18650.
----------------------------------
    Resolution: Incomplete

> race condition in FileScanRDD.scala
> -----------------------------------
>
>                 Key: SPARK-18650
>                 URL: https://issues.apache.org/jira/browse/SPARK-18650
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.2
>         Environment: scala 2.11
> macos 10.11.6
>            Reporter: Jay Goldman
>            Priority: Major
>              Labels: bulk-closed
>
> I am attempting to create a DataSet from a single CSV file :
>  val ss: SparkSession = ....
>  val ddr = ss.read.option("path", path)
> ... (choose between xml vs csv parsing)
>  var df = ddr.option("sep", ",")
>           .option("quote", "\"")
>           .option("escape", "\"") // want to retain backslashes (\) ...
>           .option("delimiter", ",")
>           .option("comment", "#")
>           .option("header", "true")
>           .option("format", "csv")
>            ddr.csv(path)
> df.count() returns 2 times the number of lines in the CSV file - i.e., each line of the input file shows up as 2 rows in df. 
> moreover df.distinct.count has the correct rows.
> There appears to be a problem in FileScanRDD.compute. I am using spark version 2.0.1 with scala 2.11. I am not going to include the entire contents of FileScanRDD.scala here.
> In FileScanRDD.compute there is the following:
>  private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
> If i put a breakpoint in either FileScanRDD.compute or FIleScanRDD.nextIterator the resulting dataset has the correct number of rows.
> Moreover, the code in FileScanRDD.scala is:
> private def nextIterator(): Boolean = {
>         updateBytesReadWithFileSize()
>         if (files.hasNext) { // breakpoint here => works
>           currentFile = files.next() // breakpoint here => fails
>           ....
>         }
>         else { .... }
> ....
> }
> if i put a breakpoint on the files.hasNext line all is well; however, if i put a breakpoint on the files.next() line the code will fail when i continue because the files iterator has become empty (see stack trace below). Disabling the breakpoint winds up creating a Dataset with each line of the csv file duplicated.
> So it appears that multiple threads are using the files iterator or the underling split value (an RDDPartition) and timing wise on my system 2 workers wind up processing the same file, with the resulting DataSet having 2 copies of each of the input lines.
> This code is not active when parsing an XML file. 
> here is stack trace:
> java.util.NoSuchElementException: next on empty iterator
> 	at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
> 	at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
> 	at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:111)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:86)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 16/11/30 09:31:07 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.util.NoSuchElementException: next on empty iterator
> 	at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
> 	at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
> 	at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:111)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:86)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
> 	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> 	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
> 	at scala.Option.foreach(Option.scala:257)
> 	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
> 	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)
> 	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
> 	at org.apache.spark.rdd.RDD.collect(RDD.scala:911)
> 	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
> 	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
> 	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
> 	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
> 	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
> 	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2227)
> 	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2226)
> 	at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559)
> 	at org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
> 	at edu.mit.ll.bb.ingest.LogAsMaps$$anonfun$makeDataset$1.apply(LogAsMaps.scala:275)
> 	at edu.mit.ll.bb.ingest.LogAsMaps$$anonfun$makeDataset$1.apply(LogAsMaps.scala:232)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> 	at edu.mit.ll.bb.ingest.LogAsMaps$.makeDataset(LogAsMaps.scala:232)
> 	at edu.mit.ll.bb.ingest.LogAsMaps$.apply(LogAsMaps.scala:179)
> 	at edu.mit.ll.bb.ingest.LogAsMaps$.makeLog(LogAsMaps.scala:396)
> 	at edu.mit.ll.bb.app.BrowseApp$.delayedEndpoint$edu$mit$ll$bb$app$BrowseApp$1(BrowseApp.scala:103)
> 	at edu.mit.ll.bb.app.BrowseApp$delayedInit$body.apply(BrowseApp.scala:18)
> 	at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> 	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
> 	at scala.App$$anonfun$main$1.apply(App.scala:76)
> 	at scala.App$$anonfun$main$1.apply(App.scala:76)
> 	at scala.collection.immutable.List.foreach(List.scala:381)
> 	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
> 	at scala.App$class.main(App.scala:76)
> 	at edu.mit.ll.bb.app.SingleEventFileApp.main(SingleEventFileApp.scala:6)
> 	at edu.mit.ll.bb.app.BrowseApp.main(BrowseApp.scala)
> Caused by: java.util.NoSuchElementException: next on empty iterator
> 	at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
> 	at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
> 	at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:111)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:86)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Disconnected from the target VM, address: '127.0.0.1:64555', transport: 'socket'



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org