You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jim Carroll <ji...@gmail.com> on 2014/12/16 19:22:41 UTC

Spark handling of a file://xxxx.gz Uri

Is there a way to get Spark to NOT reparition/shuffle/expand a
sc.textFile(fileUri) when the URI is a gzipped file?

Expanding a gzipped file should be thought of as a "transformation" and not
an "action" (if the analogy is apt). There is no need to fully create and
fill out an intermediate RDD with the expanded data when it can be done one
row at a time.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-handling-of-a-file-xxxx-gz-Uri-tp20726.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark handling of a file://xxxx.gz Uri

Posted by Jim <ji...@gmail.com>.
Hi Harry,

Thanks for your response.

I'm working in scala. When I do a "count" call it expands the RDD in the 
count (since it's an action). You can see the call stack that results in 
the failure of the job here:

  ERROR DiskBlockObjectWriter - Uncaught exception while reverting 
partial writes to file 
/tmp/spark-local-20141216170458-964a/1d/temp_shuffle_4f46af09-5521-4fc6-adb1-c72839520560
java.io.IOException: No space left on device
     at java.io.FileOutputStream.writeBytes(Native Method)
     at java.io.FileOutputStream.write(FileOutputStream.java:345)
     at 
org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream$$anonfun$write$3.apply$mcV$sp(BlockObjectWriter.scala:86)
     at 
org.apache.spark.storage.DiskBlockObjectWriter.org$apache$spark$storage$DiskBlockObjectWriter$$callWithTiming(BlockObjectWriter.scala:221)
     at 
org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream.write(BlockObjectWriter.scala:86)
     at 
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
     at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
     at 
org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:263)
     at 
java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
     at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:718)
     at 
org.apache.spark.serializer.JavaSerializationStream.flush(JavaSerializer.scala:51)
     at 
org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(BlockObjectWriter.scala:173)
     at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$stop$2.apply(ExternalSorter.scala:774)
     at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$stop$2.apply(ExternalSorter.scala:773)
     at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
     at 
org.apache.spark.util.collection.ExternalSorter.stop(ExternalSorter.scala:773)
     at 
org.apache.spark.shuffle.sort.SortShuffleWriter.stop(SortShuffleWriter.scala:93)
     at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:74)
     at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
     at org.apache.spark.scheduler.Task.run(Task.scala:56)
     at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
     at java.lang.Thread.run(Thread.java:745)

Notice the task run (this is now doing a "count") results in a Shuffle 
during which it writes the intermediate RDD to disk (and fails when the 
disk is full). This intermediate RDD/disk write is unnecessary.

I even implemented a "Seq[String]" in terms of streaming the file and 
called sc.parallelize(mySequence,1) and THIS results in a call to 
"toArray" on my sequence. Since this wont fit on disk it certainly wont 
fit in an array in memory.

Thanks for taking the time to respond.

Jim

On 12/16/2014 04:57 PM, Harry Brundage wrote:
> Are you certain that's happening Jim? Why? What happens if you just do 
> sc.textFile(fileUri).count() ? If I'm not mistaken the Hadoop 
> InputFormat for gzip and the RDD wrapper around it already has the 
> "streaming" behaviour you wish for. but I could be wrong. Also, are 
> you in pyspark or scala Spark?
>
> On Tue, Dec 16, 2014 at 1:22 PM, Jim Carroll <jimfcarroll@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Is there a way to get Spark to NOT reparition/shuffle/expand a
>     sc.textFile(fileUri) when the URI is a gzipped file?
>
>     Expanding a gzipped file should be thought of as a
>     "transformation" and not
>     an "action" (if the analogy is apt). There is no need to fully
>     create and
>     fill out an intermediate RDD with the expanded data when it can be
>     done one
>     row at a time.
>
>
>
>
>     --
>     View this message in context:
>     http://apache-spark-user-list.1001560.n3.nabble.com/Spark-handling-of-a-file-xxxx-gz-Uri-tp20726.html
>     Sent from the Apache Spark User List mailing list archive at
>     Nabble.com.
>
>     ---------------------------------------------------------------------
>     To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>     <ma...@spark.apache.org>
>     For additional commands, e-mail: user-help@spark.apache.org
>     <ma...@spark.apache.org>
>


Re: Spark handling of a file://xxxx.gz Uri

Posted by Harry Brundage <ha...@shopify.com>.
Are you certain that's happening Jim? Why? What happens if you just do
sc.textFile(fileUri).count() ? If I'm not mistaken the Hadoop InputFormat
for gzip and the RDD wrapper around it already has the "streaming"
behaviour you wish for. but I could be wrong. Also, are you in pyspark or
scala Spark?

On Tue, Dec 16, 2014 at 1:22 PM, Jim Carroll <ji...@gmail.com> wrote:
>
> Is there a way to get Spark to NOT reparition/shuffle/expand a
> sc.textFile(fileUri) when the URI is a gzipped file?
>
> Expanding a gzipped file should be thought of as a "transformation" and not
> an "action" (if the analogy is apt). There is no need to fully create and
> fill out an intermediate RDD with the expanded data when it can be done one
> row at a time.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-handling-of-a-file-xxxx-gz-Uri-tp20726.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>