You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Matthew Farrellee <ma...@redhat.com> on 2014/09/03 02:12:46 UTC

Re: [PySpark] large # of partitions causes OOM

On 08/29/2014 06:05 PM, Nick Chammas wrote:
> Here’s a repro for PySpark:
>
> |a = sc.parallelize(["Nick","John","Bob"])
> a = a.repartition(24000)
> a.keyBy(lambda  x: len(x)).reduceByKey(lambda  x,y: x + y).take(1)
> |
>
> When I try this on an EC2 cluster with 1.1.0-rc2 and Python 2.7, this is
> what I get:
>
> |>>>a = sc.parallelize(["Nick","John","Bob"])
>>>>a = a.repartition(24000)
>>>>a.keyBy(lambda  x: len(x)).reduceByKey(lambda  x,y: x + y).take(1)
> 14/08/29  21:53:40  WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(0, ip-10-138-29-167.ec2.internal,46252,0)with  no recent heart beats:175143ms exceeds45000ms
> 14/08/29  21:53:50  WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(10, ip-10-138-18-106.ec2.internal,33711,0)with  no recent heart beats:175359ms exceeds45000ms
> 14/08/29  21:54:02  WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(19, ip-10-139-36-207.ec2.internal,52208,0)with  no recent heart beats:173061ms exceeds45000ms
> 14/08/29  21:54:13  WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(5, ip-10-73-142-70.ec2.internal,56162,0)with  no recent heart beats:176816ms exceeds45000ms
> 14/08/29  21:54:22  WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(7, ip-10-236-145-200.ec2.internal,40959,0)with  no recent heart beats:182241ms exceeds45000ms
> 14/08/29  21:54:40  WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(4, ip-10-139-1-195.ec2.internal,49221,0)with  no recent heart beats:178406ms exceeds45000ms
> 14/08/29  21:54:41  ERROR Utils: Uncaught exceptionin  thread Result resolver thread-3
> java.lang.OutOfMemoryError: Java heap space
>      at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
>      at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
>      at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
>      at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
>      at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
>      at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>      at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>      atorg.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
>      atorg.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>      atorg.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
>      atorg.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
>      atorg.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
>      atorg.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
>      atorg.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
>      atorg.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
>      atorg.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
>      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>      at java.lang.Thread.run(Thread.java:745)
> Exceptionin  thread"Result resolver thread-3"  14/08/29  21:56:26  ERROR SendingConnection: Exceptionwhile  reading SendingConnection to ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014)
> java.nio.channels.ClosedChannelException
>      at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
>      at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
>      atorg.apache.spark.network.SendingConnection.read(Connection.scala:390)
>      atorg.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
>      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>      at java.lang.Thread.run(Thread.java:745)
> java.lang.OutOfMemoryError: Java heap space
>      at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
>      at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
>      at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
>      at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
>      at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
>      at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>      at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>      atorg.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
>      atorg.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>      atorg.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
>      atorg.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
>      atorg.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
>      atorg.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
>      atorg.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
>      atorg.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
>      atorg.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
>      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>      at java.lang.Thread.run(Thread.java:745)
> 14/08/29  21:54:43  WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(6, ip-10-137-1-139.ec2.internal,42539,0)with  no recent heart beats:183978ms exceeds45000ms
> 14/08/29  21:57:42  ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(ip-10-138-9-33.ec2.internal,41924)not  found
> 14/08/29  21:57:51  WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(11, ip-10-236-181-116.ec2.internal,46847,0)with  no recent heart beats:178629ms exceeds45000ms
> 14/08/29  21:57:43  ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(ip-10-137-1-139.ec2.internal,42539)not  found
> 14/08/29  21:57:54  ERROR SendingConnection: Exceptionwhile  reading SendingConnection to ConnectionManagerId(ip-10-141-136-168.ec2.internal,42960)
> java.nio.channels.ClosedChannelException
>      at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
>      at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
>      atorg.apache.spark.network.SendingConnection.read(Connection.scala:390)
>      atorg.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
>      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>      at java.lang.Thread.run(Thread.java:745)
> |
>
> Is this a bug? What’s going on here?
>
> Nick

i can't get this to fail, but maybe i have just enough memory.

i'm curious if it fails when you simply call a.collect() or 
a.glom().collect()

best,


matt

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org