You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Harel Gliksman <ha...@gmail.com> on 2018/09/07 14:34:49 UTC

Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

Hi,

We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge
(60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.

It processes ~40 TB of data using aggregateByKey in which we specify
numPartitions = 300,000.
Map side tasks succeed, but reduce side tasks all fail.

We notice the following driver error:

18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null

 java.lang.OutOfMemoryError

	at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
	at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
	at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
	at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
	at java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
	at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
	at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
	at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
	at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
	at org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
	at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError
	at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
	at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
	at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
	at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
	at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
	at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
	at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
	at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
	at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
	at org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
	at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
	Suppressed: java.lang.OutOfMemoryError
		at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
		at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
		at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
		at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
		at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
		at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
		at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
		at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
		at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
		at java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
		at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
		at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
		at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
		at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
		... 6 more

We found this reference
(https://issues.apache.org/jira/browse/SPARK-1239
<https://www.google.com/url?q=https://issues.apache.org/jira/browse/SPARK-1239&sa=D&source=hangouts&ust=1536416255076000&usg=AFQjCNF5KWUuXDr9ZWNn4Geos1sQlpKHwA>)
to a similar issue that was closed in 2016.

Please advise,

Harel.

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

Posted by Jacob Lynn <ab...@gmail.com>.
Thanks for the pointer, Vadim. However, I just tried it with Spark 2.4 and
get the same failure. (I was previously testing with 2.2 and/or 2.3.) And I
don't see this particular issue referred to there.  The ticket that Harel
commented on indeed appears to be the most similar one to this issue:
https://issues.apache.org/jira/browse/SPARK-1239.

On Mon, Nov 11, 2019 at 4:43 PM Vadim Semenov <va...@datadoghq.com> wrote:

> There's an umbrella ticket for various 2GB limitations
> https://issues.apache.org/jira/browse/SPARK-6235
>
> On Fri, Nov 8, 2019 at 4:11 PM Jacob Lynn <ab...@gmail.com> wrote:
> >
> > Sorry for the noise, folks! I understand that reducing the number of
> partitions works around the issue (at the scale I'm working at, anyway) --
> as I mentioned in my initial email -- and I understand the root cause. I'm
> not looking for advice on how to resolve my issue. I'm just pointing out
> that this is a real bug/limitation that impacts real-world use cases, in
> case there is some proper Spark dev out there who is looking for a problem
> to solve.
> >
> > On Fri, Nov 8, 2019 at 2:24 PM Vadim Semenov <va...@datadoghq.com.invalid>
> wrote:
> >>
> >> Basically, the driver tracks partitions and sends it over to
> >> executors, so what it's trying to do is to serialize and compress the
> >> map but because it's so big, it goes over 2GiB and that's Java's limit
> >> on the max size of byte arrays, so the whole thing drops.
> >>
> >> The size of data doesn't matter here much but the number of partitions
> >> is what the root cause of the issue, try reducing it below 30000 and
> >> see how it goes.
> >>
> >> On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman <ha...@gmail.com>
> wrote:
> >> >
> >> > Hi,
> >> >
> >> > We are running a Spark (2.3.1) job on an EMR cluster with 500
> r3.2xlarge (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
> >> >
> >> > It processes ~40 TB of data using aggregateByKey in which we specify
> numPartitions = 300,000.
> >> > Map side tasks succeed, but reduce side tasks all fail.
> >> >
> >> > We notice the following driver error:
> >> >
> >> > 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
> >> >
> >> >  java.lang.OutOfMemoryError
> >> >
> >> > at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> >> > at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> >> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> >> > at
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> >> > at
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> >> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> >> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> >> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> >> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> >> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> >> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> >> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> >> > at
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> >> > at
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> >> > at
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> >> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> > at java.lang.Thread.run(Thread.java:748)
> >> > Exception in thread "map-output-dispatcher-0"
> java.lang.OutOfMemoryError
> >> > at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> >> > at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> >> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> >> > at
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> >> > at
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> >> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> >> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> >> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> >> > at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> >> > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> >> > at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> >> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> >> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
> >> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> >> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> >> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> >> > at
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> >> > at
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> >> > at
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> >> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> > at java.lang.Thread.run(Thread.java:748)
> >> > Suppressed: java.lang.OutOfMemoryError
> >> > at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> >> > at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> >> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> >> > at
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> >> > at
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> >> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> >> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> >> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> >> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> >> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> >> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> >> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> >> > ... 6 more
> >> >
> >> > We found this reference (
> https://issues.apache.org/jira/browse/SPARK-1239) to a similar issue that
> was closed in 2016.
> >> >
> >> > Please advise,
> >> >
> >> > Harel.
> >> >
> >> >
> >>
> >>
> >> --
> >> Sent from my iPhone
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> >>
>
>
> --
> Sent from my iPhone
>

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

Posted by Vadim Semenov <va...@datadoghq.com.INVALID>.
There's an umbrella ticket for various 2GB limitations
https://issues.apache.org/jira/browse/SPARK-6235

On Fri, Nov 8, 2019 at 4:11 PM Jacob Lynn <ab...@gmail.com> wrote:
>
> Sorry for the noise, folks! I understand that reducing the number of partitions works around the issue (at the scale I'm working at, anyway) -- as I mentioned in my initial email -- and I understand the root cause. I'm not looking for advice on how to resolve my issue. I'm just pointing out that this is a real bug/limitation that impacts real-world use cases, in case there is some proper Spark dev out there who is looking for a problem to solve.
>
> On Fri, Nov 8, 2019 at 2:24 PM Vadim Semenov <va...@datadoghq.com.invalid> wrote:
>>
>> Basically, the driver tracks partitions and sends it over to
>> executors, so what it's trying to do is to serialize and compress the
>> map but because it's so big, it goes over 2GiB and that's Java's limit
>> on the max size of byte arrays, so the whole thing drops.
>>
>> The size of data doesn't matter here much but the number of partitions
>> is what the root cause of the issue, try reducing it below 30000 and
>> see how it goes.
>>
>> On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman <ha...@gmail.com> wrote:
>> >
>> > Hi,
>> >
>> > We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
>> >
>> > It processes ~40 TB of data using aggregateByKey in which we specify numPartitions = 300,000.
>> > Map side tasks succeed, but reduce side tasks all fail.
>> >
>> > We notice the following driver error:
>> >
>> > 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
>> >
>> >  java.lang.OutOfMemoryError
>> >
>> > at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> > at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> > at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> > at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> > at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> > at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> > at java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
>> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
>> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
>> > at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
>> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
>> > at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
>> > at org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
>> > at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
>> > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> > at java.lang.Thread.run(Thread.java:748)
>> > Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError
>> > at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> > at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> > at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> > at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> > at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> > at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> > at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>> > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> > at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
>> > at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
>> > at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
>> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
>> > at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
>> > at org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
>> > at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
>> > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> > at java.lang.Thread.run(Thread.java:748)
>> > Suppressed: java.lang.OutOfMemoryError
>> > at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> > at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> > at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> > at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> > at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> > at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> > at java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
>> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
>> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
>> > at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
>> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
>> > ... 6 more
>> >
>> > We found this reference (https://issues.apache.org/jira/browse/SPARK-1239) to a similar issue that was closed in 2016.
>> >
>> > Please advise,
>> >
>> > Harel.
>> >
>> >
>>
>>
>> --
>> Sent from my iPhone
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>


-- 
Sent from my iPhone

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

Posted by Jacob Lynn <ab...@gmail.com>.
Sorry for the noise, folks! I understand that reducing the number of
partitions works around the issue (at the scale I'm working at, anyway) --
as I mentioned in my initial email -- and I understand the root cause. I'm
not looking for advice on how to resolve my issue. I'm just pointing out
that this is a real bug/limitation that impacts real-world use cases, in
case there is some proper Spark dev out there who is looking for a problem
to solve.

On Fri, Nov 8, 2019 at 2:24 PM Vadim Semenov <va...@datadoghq.com.invalid>
wrote:

> Basically, the driver tracks partitions and sends it over to
> executors, so what it's trying to do is to serialize and compress the
> map but because it's so big, it goes over 2GiB and that's Java's limit
> on the max size of byte arrays, so the whole thing drops.
>
> The size of data doesn't matter here much but the number of partitions
> is what the root cause of the issue, try reducing it below 30000 and
> see how it goes.
>
> On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman <ha...@gmail.com>
> wrote:
> >
> > Hi,
> >
> > We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge
> (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
> >
> > It processes ~40 TB of data using aggregateByKey in which we specify
> numPartitions = 300,000.
> > Map side tasks succeed, but reduce side tasks all fail.
> >
> > We notice the following driver error:
> >
> > 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
> >
> >  java.lang.OutOfMemoryError
> >
> > at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> > at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> > at
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> > at
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> > at
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> > at
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> > at
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> > Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError
> > at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> > at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> > at
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> > at
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> > at
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> > at
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> > at
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> > Suppressed: java.lang.OutOfMemoryError
> > at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> > at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> > at
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> > at
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> > ... 6 more
> >
> > We found this reference (
> https://issues.apache.org/jira/browse/SPARK-1239) to a similar issue that
> was closed in 2016.
> >
> > Please advise,
> >
> > Harel.
> >
> >
>
>
> --
> Sent from my iPhone
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

Posted by Vadim Semenov <va...@datadoghq.com.INVALID>.
Basically, the driver tracks partitions and sends it over to
executors, so what it's trying to do is to serialize and compress the
map but because it's so big, it goes over 2GiB and that's Java's limit
on the max size of byte arrays, so the whole thing drops.

The size of data doesn't matter here much but the number of partitions
is what the root cause of the issue, try reducing it below 30000 and
see how it goes.

On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman <ha...@gmail.com> wrote:
>
> Hi,
>
> We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
>
> It processes ~40 TB of data using aggregateByKey in which we specify numPartitions = 300,000.
> Map side tasks succeed, but reduce side tasks all fail.
>
> We notice the following driver error:
>
> 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
>
>  java.lang.OutOfMemoryError
>
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> at org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> at org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Suppressed: java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> ... 6 more
>
> We found this reference (https://issues.apache.org/jira/browse/SPARK-1239) to a similar issue that was closed in 2016.
>
> Please advise,
>
> Harel.
>
>


-- 
Sent from my iPhone

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

Posted by Jacob Lynn <ab...@gmail.com>.
File system is HDFS. Executors are 2 cores, 14GB RAM. But I don't think
either of these relate to the problem -- this is a memory allocation issue
on the driver side, and happens in an intermediate stage that has no HDFS
read/write.

On Fri, Nov 8, 2019 at 10:01 AM Spico Florin <sp...@gmail.com> wrote:

> Hi!
> What file system are you using: EMRFS or HDFS?
> Also what memory are you using for the reducer ?
>
> On Thu, Nov 7, 2019 at 8:37 PM abeboparebop <ab...@gmail.com>
> wrote:
>
>> I ran into the same issue processing 20TB of data, with 200k tasks on both
>> the map and reduce sides. Reducing to 100k tasks each resolved the issue.
>> But this could/would be a major problem in cases where the data is bigger
>> or
>> the computation is heavier, since reducing the number of partitions may
>> not
>> be an option.
>>
>>
>> harelglik wrote
>> > I understand the error is because the number of partitions is very high,
>> > yet when processing 40 TB (and this number is expected to grow) this
>> > number
>> > seems reasonable:
>> > 40TB / 300,000 will result in partitions size of ~ 130MB (data should be
>> > evenly distributed).
>> >
>> > On Fri, Sep 7, 2018 at 6:28 PM Vadim Semenov &lt;
>>
>> > vadim@
>>
>> > &gt; wrote:
>> >
>> >> You have too many partitions, so when the driver is trying to gather
>> >> the status of all map outputs and send back to executors it chokes on
>> >> the size of the structure that needs to be GZipped, and since it's
>> >> bigger than 2GiB, it produces OOM.
>> >> On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman &lt;
>>
>> > harelglik@
>>
>> > &gt;
>> >> wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> > We are running a Spark (2.3.1) job on an EMR cluster with 500
>> >> r3.2xlarge
>> >> (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
>> >> >
>> >> > It processes ~40 TB of data using aggregateByKey in which we specify
>> >> numPartitions = 300,000.
>> >> > Map side tasks succeed, but reduce side tasks all fail.
>> >> >
>> >> > We notice the following driver error:
>> >> >
>> >> > 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
>> >> >
>> >> >  java.lang.OutOfMemoryError
>> >> >
>> >> > at
>> >>
>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> >> > at
>> >>
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> >> > at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> >> > at
>> >>
>> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> >> > at
>> >> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
>> >> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
>> >> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
>> >> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
>> >> > at
>> >>
>> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
>> >> > at
>> >>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> >> > at
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> >> > at java.lang.Thread.run(Thread.java:748)
>> >> > Exception in thread "map-output-dispatcher-0"
>> >> java.lang.OutOfMemoryError
>> >> > at
>> >>
>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> >> > at
>> >>
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> >> > at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> >> > at
>> >>
>> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> >> > at
>> >> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>> >> > at
>> >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>> >> > at
>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>> >> > at
>> >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>> >> > at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
>> >> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
>> >> > at
>> >>
>> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
>> >> > at
>> >>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> >> > at
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> >> > at java.lang.Thread.run(Thread.java:748)
>> >> > Suppressed: java.lang.OutOfMemoryError
>> >> > at
>> >>
>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> >> > at
>> >>
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> >> > at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> >> > at
>> >>
>> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> >> > at
>> >> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> >> > at
>> >>
>> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
>> >> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
>> >> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
>> >> > at
>> >>
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
>> >> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
>> >> > ... 6 more
>> >> >
>> >> > We found this reference (
>> >> https://issues.apache.org/jira/browse/SPARK-1239) to a similar issue
>> that
>> >> was closed in 2016.
>> >> >
>> >> > Please advise,
>> >> >
>> >> > Harel.
>> >> >
>> >> >
>> >>
>> >>
>> >> --
>> >> Sent from my iPhone
>> >>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>>

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

Posted by Spico Florin <sp...@gmail.com>.
Hi!
What file system are you using: EMRFS or HDFS?
Also what memory are you using for the reducer ?

On Thu, Nov 7, 2019 at 8:37 PM abeboparebop <ab...@gmail.com> wrote:

> I ran into the same issue processing 20TB of data, with 200k tasks on both
> the map and reduce sides. Reducing to 100k tasks each resolved the issue.
> But this could/would be a major problem in cases where the data is bigger
> or
> the computation is heavier, since reducing the number of partitions may not
> be an option.
>
>
> harelglik wrote
> > I understand the error is because the number of partitions is very high,
> > yet when processing 40 TB (and this number is expected to grow) this
> > number
> > seems reasonable:
> > 40TB / 300,000 will result in partitions size of ~ 130MB (data should be
> > evenly distributed).
> >
> > On Fri, Sep 7, 2018 at 6:28 PM Vadim Semenov &lt;
>
> > vadim@
>
> > &gt; wrote:
> >
> >> You have too many partitions, so when the driver is trying to gather
> >> the status of all map outputs and send back to executors it chokes on
> >> the size of the structure that needs to be GZipped, and since it's
> >> bigger than 2GiB, it produces OOM.
> >> On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman &lt;
>
> > harelglik@
>
> > &gt;
> >> wrote:
> >> >
> >> > Hi,
> >> >
> >> > We are running a Spark (2.3.1) job on an EMR cluster with 500
> >> r3.2xlarge
> >> (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
> >> >
> >> > It processes ~40 TB of data using aggregateByKey in which we specify
> >> numPartitions = 300,000.
> >> > Map side tasks succeed, but reduce side tasks all fail.
> >> >
> >> > We notice the following driver error:
> >> >
> >> > 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
> >> >
> >> >  java.lang.OutOfMemoryError
> >> >
> >> > at
> >>
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> >> > at
> >>
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> >> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> >> > at
> >>
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> >> > at
> >> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> >> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> >> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> >> > at
> >>
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> >> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> >> > at
> >>
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> >> > at
> >>
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> >> > at
> >>
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> >> > at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> > at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> > at java.lang.Thread.run(Thread.java:748)
> >> > Exception in thread "map-output-dispatcher-0"
> >> java.lang.OutOfMemoryError
> >> > at
> >>
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> >> > at
> >>
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> >> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> >> > at
> >>
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> >> > at
> >> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> >> > at
> >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> >> > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> >> > at
> >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> >> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> >> > at
> >>
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
> >> > at
> >>
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> >> > at
> >>
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> >> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> >> > at
> >>
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> >> > at
> >>
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> >> > at
> >>
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> >> > at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> > at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> > at java.lang.Thread.run(Thread.java:748)
> >> > Suppressed: java.lang.OutOfMemoryError
> >> > at
> >>
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> >> > at
> >>
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> >> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> >> > at
> >>
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> >> > at
> >> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> >> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> >> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> >> > at
> >>
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> >> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> >> > ... 6 more
> >> >
> >> > We found this reference (
> >> https://issues.apache.org/jira/browse/SPARK-1239) to a similar issue
> that
> >> was closed in 2016.
> >> >
> >> > Please advise,
> >> >
> >> > Harel.
> >> >
> >> >
> >>
> >>
> >> --
> >> Sent from my iPhone
> >>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

Posted by abeboparebop <ab...@gmail.com>.
I ran into the same issue processing 20TB of data, with 200k tasks on both
the map and reduce sides. Reducing to 100k tasks each resolved the issue.
But this could/would be a major problem in cases where the data is bigger or
the computation is heavier, since reducing the number of partitions may not
be an option.


harelglik wrote
> I understand the error is because the number of partitions is very high,
> yet when processing 40 TB (and this number is expected to grow) this
> number
> seems reasonable:
> 40TB / 300,000 will result in partitions size of ~ 130MB (data should be
> evenly distributed).
> 
> On Fri, Sep 7, 2018 at 6:28 PM Vadim Semenov &lt;

> vadim@

> &gt; wrote:
> 
>> You have too many partitions, so when the driver is trying to gather
>> the status of all map outputs and send back to executors it chokes on
>> the size of the structure that needs to be GZipped, and since it's
>> bigger than 2GiB, it produces OOM.
>> On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman &lt;

> harelglik@

> &gt;
>> wrote:
>> >
>> > Hi,
>> >
>> > We are running a Spark (2.3.1) job on an EMR cluster with 500
>> r3.2xlarge
>> (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
>> >
>> > It processes ~40 TB of data using aggregateByKey in which we specify
>> numPartitions = 300,000.
>> > Map side tasks succeed, but reduce side tasks all fail.
>> >
>> > We notice the following driver error:
>> >
>> > 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
>> >
>> >  java.lang.OutOfMemoryError
>> >
>> > at
>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> > at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> > at
>> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> > at
>> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> > at
>> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> > at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> > at
>> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
>> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
>> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
>> > at
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
>> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
>> > at
>> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
>> > at
>> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
>> > at
>> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
>> > at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> > at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> > at java.lang.Thread.run(Thread.java:748)
>> > Exception in thread "map-output-dispatcher-0"
>> java.lang.OutOfMemoryError
>> > at
>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> > at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> > at
>> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> > at
>> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> > at
>> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> > at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> > at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>> > at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>> > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>> > at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> > at
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
>> > at
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
>> > at
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
>> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
>> > at
>> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
>> > at
>> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
>> > at
>> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
>> > at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> > at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> > at java.lang.Thread.run(Thread.java:748)
>> > Suppressed: java.lang.OutOfMemoryError
>> > at
>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> > at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> > at
>> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> > at
>> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> > at
>> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> > at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> > at
>> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
>> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
>> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
>> > at
>> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
>> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
>> > ... 6 more
>> >
>> > We found this reference (
>> https://issues.apache.org/jira/browse/SPARK-1239) to a similar issue that
>> was closed in 2016.
>> >
>> > Please advise,
>> >
>> > Harel.
>> >
>> >
>>
>>
>> --
>> Sent from my iPhone
>>





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

Posted by Harel Gliksman <ha...@gmail.com>.
I understand the error is because the number of partitions is very high,
yet when processing 40 TB (and this number is expected to grow) this number
seems reasonable:
40TB / 300,000 will result in partitions size of ~ 130MB (data should be
evenly distributed).

On Fri, Sep 7, 2018 at 6:28 PM Vadim Semenov <va...@datadoghq.com> wrote:

> You have too many partitions, so when the driver is trying to gather
> the status of all map outputs and send back to executors it chokes on
> the size of the structure that needs to be GZipped, and since it's
> bigger than 2GiB, it produces OOM.
> On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman <ha...@gmail.com>
> wrote:
> >
> > Hi,
> >
> > We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge
> (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
> >
> > It processes ~40 TB of data using aggregateByKey in which we specify
> numPartitions = 300,000.
> > Map side tasks succeed, but reduce side tasks all fail.
> >
> > We notice the following driver error:
> >
> > 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
> >
> >  java.lang.OutOfMemoryError
> >
> > at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> > at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> > at
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> > at
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> > at
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> > at
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> > at
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> > Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError
> > at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> > at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> > at
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> > at
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> > at
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> > at
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> > at
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> > Suppressed: java.lang.OutOfMemoryError
> > at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> > at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> > at
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> > at
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> > at
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> > at
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> > ... 6 more
> >
> > We found this reference (
> https://issues.apache.org/jira/browse/SPARK-1239) to a similar issue that
> was closed in 2016.
> >
> > Please advise,
> >
> > Harel.
> >
> >
>
>
> --
> Sent from my iPhone
>

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

Posted by Vadim Semenov <va...@datadoghq.com>.
You have too many partitions, so when the driver is trying to gather
the status of all map outputs and send back to executors it chokes on
the size of the structure that needs to be GZipped, and since it's
bigger than 2GiB, it produces OOM.
On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman <ha...@gmail.com> wrote:
>
> Hi,
>
> We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
>
> It processes ~40 TB of data using aggregateByKey in which we specify numPartitions = 300,000.
> Map side tasks succeed, but reduce side tasks all fail.
>
> We notice the following driver error:
>
> 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
>
>  java.lang.OutOfMemoryError
>
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> at org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> at org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Suppressed: java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> ... 6 more
>
> We found this reference (https://issues.apache.org/jira/browse/SPARK-1239) to a similar issue that was closed in 2016.
>
> Please advise,
>
> Harel.
>
>


-- 
Sent from my iPhone

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org