You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Alex Slusarenko (JIRA)" <ji...@apache.org> on 2015/07/29 13:51:05 UTC

[jira] [Comment Edited] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

    [ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14645913#comment-14645913 ] 

Alex Slusarenko edited comment on SPARK-1239 at 7/29/15 11:50 AM:
------------------------------------------------------------------

Hi, all. We have faced this issue many times. Currently, we have 250 000 map tasks and the same amount of reduce tasks. We have 200 slave nodes. The driver has 80 GB RAM. First we observed akka frame size limit exception and after increasing the limit we see OOM. Here is the corresponding part of the log:
{noformat}
...
15/07/27 17:22:56 INFO TaskSchedulerImpl: Adding task set 1.0 with 262144 tasks
15/07/27 17:22:57 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 233766, 10.47.190.240, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 233767, 10.145.26.133, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 233768, 10.51.191.206, PROCESS_LOCAL, 1215 bytes)
...
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3197.0 in stage 1.0 (TID 236963, 10.99.197.178, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3198.0 in stage 1.0 (TID 236964, 10.65.148.16, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3199.0 in stage 1.0 (TID 236965, 10.123.204.224, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.145.30.250:38441 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.140.170.222:35810 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.7.205.149:43761 (size: 3.8 KB, free: 4.1 GB)
...
5/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.165.146.7:37388 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.153.254.79:49517 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.95.198.154:53675 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:24:41 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 166509346 bytes
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.109.157.235:39740
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.166.156.78:59382
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.152.41.131:47968
...
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.140.253.251:44621
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.153.254.79:42648
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.169.230.246:45473
15/07/27 17:25:31 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 10.146.43.5:49989 in memory (size: 3.2 KB, free: 3.4 GB)
15/07/27 17:27:25 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-47] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3236)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
        at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
        at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
        at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
        at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
        at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844)
        at akka.remote.EndpointWriter.writeSend(Endpoint.scala:747)
        at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:722)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
...
{noformat}

Do you need any additional info?


was (Author: alyaxey):
Hi, all. We have faced this issue many times. Currently, we have 250 000 map tasks and the same amount of reduce tasks. We have 200 slave nodes. The driver has 80 G RAM. First we observed akka frame size limit exception and after increasing the limit we see OOM. Here is the corresponding part of the log:
{noformat}
...
15/07/27 17:22:56 INFO TaskSchedulerImpl: Adding task set 1.0 with 262144 tasks
15/07/27 17:22:57 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 233766, 10.47.190.240, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 233767, 10.145.26.133, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 233768, 10.51.191.206, PROCESS_LOCAL, 1215 bytes)
...
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3197.0 in stage 1.0 (TID 236963, 10.99.197.178, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3198.0 in stage 1.0 (TID 236964, 10.65.148.16, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3199.0 in stage 1.0 (TID 236965, 10.123.204.224, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.145.30.250:38441 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.140.170.222:35810 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.7.205.149:43761 (size: 3.8 KB, free: 4.1 GB)
...
5/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.165.146.7:37388 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.153.254.79:49517 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.95.198.154:53675 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:24:41 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 166509346 bytes
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.109.157.235:39740
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.166.156.78:59382
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.152.41.131:47968
...
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.140.253.251:44621
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.153.254.79:42648
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.169.230.246:45473
15/07/27 17:25:31 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 10.146.43.5:49989 in memory (size: 3.2 KB, free: 3.4 GB)
15/07/27 17:27:25 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-47] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3236)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
        at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
        at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
        at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
        at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
        at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844)
        at akka.remote.EndpointWriter.writeSend(Endpoint.scala:747)
        at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:722)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
...
{noformat}

Do you need any additional info?

> Don't fetch all map output statuses at each reducer during shuffles
> -------------------------------------------------------------------
>
>                 Key: SPARK-1239
>                 URL: https://issues.apache.org/jira/browse/SPARK-1239
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle, Spark Core
>    Affects Versions: 1.0.2, 1.1.0
>            Reporter: Patrick Wendell
>
> Instead we should modify the way we fetch map output statuses to take both a mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org