You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jay Goldman (JIRA)" <ji...@apache.org> on 2016/11/30 14:35:59 UTC

[jira] [Commented] (SPARK-18649) sc.textFile(my_file).collect() raises socket.timeout on large files

    [ https://issues.apache.org/jira/browse/SPARK-18649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708747#comment-15708747 ] 

Jay Goldman commented on SPARK-18649:
-------------------------------------

Oops - wrong stack trace, here is the one i meant to include:

java.util.NoSuchElementException: next on empty iterator
	at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
	at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
	at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:111)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/11/30 09:31:07 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.util.NoSuchElementException: next on empty iterator
	at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
	at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
	at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:111)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:911)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2227)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2226)
	at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
	at edu.mit.ll.bb.ingest.LogAsMaps$$anonfun$makeDataset$1.apply(LogAsMaps.scala:275)
	at edu.mit.ll.bb.ingest.LogAsMaps$$anonfun$makeDataset$1.apply(LogAsMaps.scala:232)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at edu.mit.ll.bb.ingest.LogAsMaps$.makeDataset(LogAsMaps.scala:232)
	at edu.mit.ll.bb.ingest.LogAsMaps$.apply(LogAsMaps.scala:179)
	at edu.mit.ll.bb.ingest.LogAsMaps$.makeLog(LogAsMaps.scala:396)
	at edu.mit.ll.bb.app.BrowseApp$.delayedEndpoint$edu$mit$ll$bb$app$BrowseApp$1(BrowseApp.scala:103)
	at edu.mit.ll.bb.app.BrowseApp$delayedInit$body.apply(BrowseApp.scala:18)
	at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
	at scala.App$$anonfun$main$1.apply(App.scala:76)
	at scala.App$$anonfun$main$1.apply(App.scala:76)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
	at scala.App$class.main(App.scala:76)
	at edu.mit.ll.bb.app.SingleEventFileApp.main(SingleEventFileApp.scala:6)
	at edu.mit.ll.bb.app.BrowseApp.main(BrowseApp.scala)
Caused by: java.util.NoSuchElementException: next on empty iterator
	at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
	at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
	at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:111)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Disconnected from the target VM, address: '127.0.0.1:64555', transport: 'socket'

> sc.textFile(my_file).collect() raises socket.timeout on large files
> -------------------------------------------------------------------
>
>                 Key: SPARK-18649
>                 URL: https://issues.apache.org/jira/browse/SPARK-18649
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>         Environment: PySpark version 1.6.2
>            Reporter: Erik Cederstrand
>
> I'm trying to load a file into the driver with this code:
>     contents = sc.textFile('hdfs://path/to/big_file.csv').collect()
> Loading into the driver instead of creating a distributed RDD is intentional in this case. The file is ca. 6GB, and I have adjusted driver memory accordingly to fit the local data. After some time, my spark/submitted job crashes with the stack trace below.
> I have traced this to pyspark/rdd.py where the _load_from_socket() method creates a socket with a hard-coded timeout of 3 seconds (this code is also present in HEAD although I'm on PySpark 1.6.2). Raising this hard-coded value to e.g. 600 lets me read the entire file.
> Is there any reason that this value does not use e.g. the 'spark.network.timeout' setting instead?
> Traceback (most recent call last):
>   File "my_textfile_test.py", line 119, in <module>
>     contents = sc.textFile('hdfs://path/to/file.csv').collect()
>   File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 772, in collect
>   File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 142, in _load_from_socket
>   File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 517, in load_stream
>   File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 511, in loads
>   File "/usr/lib/python2.7/socket.py", line 380, in read
>     data = self._sock.recv(left)
> socket.timeout: timed out
> 16/11/30 13:33:14 WARN Utils: Suppressing exception in finally: Broken pipe
> java.net.SocketException: Broken pipe
> 	at java.net.SocketOutputStream.socketWrite0(Native Method)
> 	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
> 	at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
> 	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> 	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> 	at java.io.DataOutputStream.flush(DataOutputStream.java:123)
> 	at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
> 	at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$2.apply$mcV$sp(PythonRDD.scala:650)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1248)
> 	at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:649)
> 	Suppressed: java.net.SocketException: Broken pipe
> 		at java.net.SocketOutputStream.socketWrite0(Native Method)
> 		at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
> 		at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
> 		at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> 		at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> 		at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
> 		at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
> 		... 3 more
> 16/11/30 13:33:14 ERROR PythonRDD: Error while sending iterator
> java.net.SocketException: Connection reset
> 	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
> 	at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
> 	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> 	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> 	at java.io.DataOutputStream.write(DataOutputStream.java:107)
> 	at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
> 	at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
> 	at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
> 	at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
> 	at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
> 	at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:648)
> 	at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:648)
> 	at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:648)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1239)
> 	at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:649)
> 	Suppressed: java.net.SocketException: Broken pipe
> 		at java.net.SocketOutputStream.socketWrite0(Native Method)
> 		at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
> 		at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
> 		at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> 		at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> 		at java.io.DataOutputStream.flush(DataOutputStream.java:123)
> 		at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
> 		at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$2.apply$mcV$sp(PythonRDD.scala:650)
> 		at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1248)
> 		... 1 more
> 		Suppressed: java.net.SocketException: Broken pipe
> 			at java.net.SocketOutputStream.socketWrite0(Native Method)
> 			at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
> 			at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
> 			at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> 			at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> 			at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
> 			at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
> 			... 3 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org