You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Grega Kešpret <gr...@celtra.com> on 2013/10/13 01:51:29 UTC

Large input file problem

Hi,

I'm getting Java OOM (Heap, GC overhead exceeded), Futures timed out after
[10000] milliseconds, removing BlockManager with no recent heartbeat etc. I
have narrowed down the cause to be a big input file from S3. I'm trying to
make Spark split this file to several smaller chunks, so each of these
chunks will fit in memory, but I'm out of luck.

I have tried:
- passing minSplits parameter to something greater than 1 in sc.textFile
- increasing parameter numPartitions to groupByKey
- using coalesce with numPartitions greater than 1 and shuffle = true

Basically my flow is like this:
val input = sc.textFile("s3n://.../input.gz", minSplits)
input
  .mapPartitions(l => (key, l))
  .groupByKey(numPartitions)
  .map(...)
  .saveAsTextFile

If I do input.toDebugString, I always have 1 partition (even if the
minSplits is greater than 1). It seems like Spark is trying to ingest the
whole input at once. When I manually split the file into several smaller
ones, I was able to progress successfully, and input.toDebugString was
showing 10 partitions in case of 10 files.

Thanks,

Grega

Re: Large input file problem

Posted by Grega Kešpret <gr...@celtra.com>.
Ah, I think I finally got this. Spark v0.8.0-incubating uses Hadoop 1.0.4
by default. I needed to compile it with "SPARK_HADOOP_VERSION=1.1.0 sbt/sbt
assembly", as this fix is only available from Hadoop 1.1.0 on.

http://hadoop.apache.org/releases.html#13+October%2C+2012%3A+Release+1.1.0+available
lists "Splittable bzip2 files" under bug fixes.

Grega


On Mon, Oct 14, 2013 at 12:03 PM, Grega Kešpret <gr...@celtra.com> wrote:

> I've tried using bzip2, but even with this method, when I do
> sc.textFile("s3n://.../input.bz2", minSplits) for whatever value of
> minSplits greater than 1, it doesn't seem to be able to process it in more
> partitions:
>
> scala> val logs = sc.textFile("s3n://.../input.bz2", 10)
> scala> logs.toDebugString
> 13/10/14 09:55:42 INFO mapred.FileInputFormat: Total input paths to
> process : 1
> res12: String =
> MappedRDD[287] at textFile at <console>:21 (1 partitions)
>   HadoopRDD[286] at textFile at <console>:21 (1 partitions)
>
> I'm using Spark v0.8.0-incubating.
>
>
>
> Grega
> --
> [image: Inline image 1]
> *Grega Kešpret*
> Analytics engineer
>
> Celtra — Rich Media Mobile Advertising
> celtra.com <http://www.celtra.com/> | @celtramobile<http://www.twitter.com/celtramobile>
>
>
> On Sun, Oct 13, 2013 at 2:10 AM, Grega Kešpret <gr...@celtra.com> wrote:
>
>> Thanks a lot, didn't know this. If I use some other compresion format
>> that supports splitting (like bzip2), do I get decompression for free when
>> I do sc.textFile (like with gzipped files)?
>>
>> Grega
>> --
>> [image: Inline image 1]
>> *Grega Kešpret*
>> Analytics engineer
>>
>> Celtra — Rich Media Mobile Advertising
>> celtra.com <http://www.celtra.com/> | @celtramobile<http://www.twitter.com/celtramobile>
>>
>>
>> On Sun, Oct 13, 2013 at 2:07 AM, Mark Hamstra <ma...@clearstorydata.com>wrote:
>>
>>> The basic problem that you are running into is that gzipped file is not
>>> splittable<https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-4/compression#8ca1fda1252b67145680b3a5e9d45b2a>
>>> .
>>>
>>>
>>> On Sat, Oct 12, 2013 at 4:51 PM, Grega Kešpret <gr...@celtra.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm getting Java OOM (Heap, GC overhead exceeded), Futures timed out
>>>> after [10000] milliseconds, removing BlockManager with no recent heartbeat
>>>> etc. I have narrowed down the cause to be a big input file from S3. I'm
>>>> trying to make Spark split this file to several smaller chunks, so each of
>>>> these chunks will fit in memory, but I'm out of luck.
>>>>
>>>> I have tried:
>>>> - passing minSplits parameter to something greater than 1 in sc.textFile
>>>> - increasing parameter numPartitions to groupByKey
>>>> - using coalesce with numPartitions greater than 1 and shuffle = true
>>>>
>>>> Basically my flow is like this:
>>>> val input = sc.textFile("s3n://.../input.gz", minSplits)
>>>> input
>>>>   .mapPartitions(l => (key, l))
>>>>   .groupByKey(numPartitions)
>>>>   .map(...)
>>>>   .saveAsTextFile
>>>>
>>>> If I do input.toDebugString, I always have 1 partition (even if the
>>>> minSplits is greater than 1). It seems like Spark is trying to ingest the
>>>> whole input at once. When I manually split the file into several smaller
>>>> ones, I was able to progress successfully, and input.toDebugString was
>>>> showing 10 partitions in case of 10 files.
>>>>
>>>> Thanks,
>>>>
>>>> Grega
>>>>
>>>
>>>
>>
>

Re: Large input file problem

Posted by Grega Kešpret <gr...@celtra.com>.
I've tried using bzip2, but even with this method, when I do
sc.textFile("s3n://.../input.bz2", minSplits) for whatever value of
minSplits greater than 1, it doesn't seem to be able to process it in more
partitions:

scala> val logs = sc.textFile("s3n://.../input.bz2", 10)
scala> logs.toDebugString
13/10/14 09:55:42 INFO mapred.FileInputFormat: Total input paths to process
: 1
res12: String =
MappedRDD[287] at textFile at <console>:21 (1 partitions)
  HadoopRDD[286] at textFile at <console>:21 (1 partitions)

I'm using Spark v0.8.0-incubating.



Grega
--
[image: Inline image 1]
*Grega Kešpret*
Analytics engineer

Celtra — Rich Media Mobile Advertising
celtra.com <http://www.celtra.com/> |
@celtramobile<http://www.twitter.com/celtramobile>


On Sun, Oct 13, 2013 at 2:10 AM, Grega Kešpret <gr...@celtra.com> wrote:

> Thanks a lot, didn't know this. If I use some other compresion format that
> supports splitting (like bzip2), do I get decompression for free when I do
> sc.textFile (like with gzipped files)?
>
> Grega
> --
> [image: Inline image 1]
> *Grega Kešpret*
> Analytics engineer
>
> Celtra — Rich Media Mobile Advertising
> celtra.com <http://www.celtra.com/> | @celtramobile<http://www.twitter.com/celtramobile>
>
>
> On Sun, Oct 13, 2013 at 2:07 AM, Mark Hamstra <ma...@clearstorydata.com>wrote:
>
>> The basic problem that you are running into is that gzipped file is not
>> splittable<https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-4/compression#8ca1fda1252b67145680b3a5e9d45b2a>
>> .
>>
>>
>> On Sat, Oct 12, 2013 at 4:51 PM, Grega Kešpret <gr...@celtra.com> wrote:
>>
>>> Hi,
>>>
>>> I'm getting Java OOM (Heap, GC overhead exceeded), Futures timed out
>>> after [10000] milliseconds, removing BlockManager with no recent heartbeat
>>> etc. I have narrowed down the cause to be a big input file from S3. I'm
>>> trying to make Spark split this file to several smaller chunks, so each of
>>> these chunks will fit in memory, but I'm out of luck.
>>>
>>> I have tried:
>>> - passing minSplits parameter to something greater than 1 in sc.textFile
>>> - increasing parameter numPartitions to groupByKey
>>> - using coalesce with numPartitions greater than 1 and shuffle = true
>>>
>>> Basically my flow is like this:
>>> val input = sc.textFile("s3n://.../input.gz", minSplits)
>>> input
>>>   .mapPartitions(l => (key, l))
>>>   .groupByKey(numPartitions)
>>>   .map(...)
>>>   .saveAsTextFile
>>>
>>> If I do input.toDebugString, I always have 1 partition (even if the
>>> minSplits is greater than 1). It seems like Spark is trying to ingest the
>>> whole input at once. When I manually split the file into several smaller
>>> ones, I was able to progress successfully, and input.toDebugString was
>>> showing 10 partitions in case of 10 files.
>>>
>>> Thanks,
>>>
>>> Grega
>>>
>>
>>
>

Re: Large input file problem

Posted by Grega Kešpret <gr...@celtra.com>.
Thanks a lot, didn't know this. If I use some other compresion format that
supports splitting (like bzip2), do I get decompression for free when I do
sc.textFile (like with gzipped files)?

Grega
--
[image: Inline image 1]
*Grega Kešpret*
Analytics engineer

Celtra — Rich Media Mobile Advertising
celtra.com <http://www.celtra.com/> |
@celtramobile<http://www.twitter.com/celtramobile>


On Sun, Oct 13, 2013 at 2:07 AM, Mark Hamstra <ma...@clearstorydata.com>wrote:

> The basic problem that you are running into is that gzipped file is not
> splittable<https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-4/compression#8ca1fda1252b67145680b3a5e9d45b2a>
> .
>
>
> On Sat, Oct 12, 2013 at 4:51 PM, Grega Kešpret <gr...@celtra.com> wrote:
>
>> Hi,
>>
>> I'm getting Java OOM (Heap, GC overhead exceeded), Futures timed out
>> after [10000] milliseconds, removing BlockManager with no recent heartbeat
>> etc. I have narrowed down the cause to be a big input file from S3. I'm
>> trying to make Spark split this file to several smaller chunks, so each of
>> these chunks will fit in memory, but I'm out of luck.
>>
>> I have tried:
>> - passing minSplits parameter to something greater than 1 in sc.textFile
>> - increasing parameter numPartitions to groupByKey
>> - using coalesce with numPartitions greater than 1 and shuffle = true
>>
>> Basically my flow is like this:
>> val input = sc.textFile("s3n://.../input.gz", minSplits)
>> input
>>   .mapPartitions(l => (key, l))
>>   .groupByKey(numPartitions)
>>   .map(...)
>>   .saveAsTextFile
>>
>> If I do input.toDebugString, I always have 1 partition (even if the
>> minSplits is greater than 1). It seems like Spark is trying to ingest the
>> whole input at once. When I manually split the file into several smaller
>> ones, I was able to progress successfully, and input.toDebugString was
>> showing 10 partitions in case of 10 files.
>>
>> Thanks,
>>
>> Grega
>>
>
>

Re: Large input file problem

Posted by Mark Hamstra <ma...@clearstorydata.com>.
The basic problem that you are running into is that gzipped file is not
splittable<https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-4/compression#8ca1fda1252b67145680b3a5e9d45b2a>
.


On Sat, Oct 12, 2013 at 4:51 PM, Grega Kešpret <gr...@celtra.com> wrote:

> Hi,
>
> I'm getting Java OOM (Heap, GC overhead exceeded), Futures timed out after
> [10000] milliseconds, removing BlockManager with no recent heartbeat etc. I
> have narrowed down the cause to be a big input file from S3. I'm trying to
> make Spark split this file to several smaller chunks, so each of these
> chunks will fit in memory, but I'm out of luck.
>
> I have tried:
> - passing minSplits parameter to something greater than 1 in sc.textFile
> - increasing parameter numPartitions to groupByKey
> - using coalesce with numPartitions greater than 1 and shuffle = true
>
> Basically my flow is like this:
> val input = sc.textFile("s3n://.../input.gz", minSplits)
> input
>   .mapPartitions(l => (key, l))
>   .groupByKey(numPartitions)
>   .map(...)
>   .saveAsTextFile
>
> If I do input.toDebugString, I always have 1 partition (even if the
> minSplits is greater than 1). It seems like Spark is trying to ingest the
> whole input at once. When I manually split the file into several smaller
> ones, I was able to progress successfully, and input.toDebugString was
> showing 10 partitions in case of 10 files.
>
> Thanks,
>
> Grega
>