You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Cheng Hao (JIRA)" <ji...@apache.org> on 2015/08/14 04:19:45 UTC

[jira] [Commented] (SPARK-9879) OOM in LIMIT clause with large number

    [ https://issues.apache.org/jira/browse/SPARK-9879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14696338#comment-14696338 ] 

Cheng Hao commented on SPARK-9879:
----------------------------------

I create a new physical operator called LargeLimit, and will take effect when the limit is equal or greater than the SqlConf.LIMIT_ROWS in LIMIT clause.

LargeLimit will trigger the children RDD execution and persist its result, and we need to iterate the persisted data twice. The first iteration to get the number of records in each partition, then we can compute how many records we need to take from each of the partition, to satisfy the total number of records we need; in the second iteration, we just take the records from each of partition, according to the specified numbers.

The main advantage of this approach:
- No single node shuffle required, even no data shuffle required, and the result data is still in distributed mode.
- Keep the same output partitioning as its child.

> OOM in LIMIT clause with large number
> -------------------------------------
>
>                 Key: SPARK-9879
>                 URL: https://issues.apache.org/jira/browse/SPARK-9879
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>            Reporter: Cheng Hao
>
> {code}
> create table spark.tablsetest as select * from dpa_ord_bill_tf order by member_id limit 20000000;
> {code}
>          
> {code}
> spark-sql --driver-memory 48g --executor-memory 24g --driver-java-options -XX:PermSize=1024M -XX:MaxPermSize=2048M
> Error logs
> 15/07/27 10:22:43 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-20]shutting down ActorSystem [sparkDriver]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:2271)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852)
> at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708)
> at org.apache.spark.util.Utils$$anon$2.write(Utils.scala:134)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.close(Output.java:165)
> at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:162)
> at org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:139)
> at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:65)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
> at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
> at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
> at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168)
> at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467)
> at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231)
> 15/07/27 10:22:43 ERROR ErrorMonitor: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-20]shutting down ActorSystem [sparkDriver]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:2271)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852)
> at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708)
> at org.apache.spark.util.Utils$$anon$2.write(Utils.scala:134)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.close(Output.java:165)
> at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:162)
> at org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:139)
> at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:65)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
> at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
> at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
> at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168)
> at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467)
> at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231)
> 15/07/27 10:22:46 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
> 15/07/27 10:22:46 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
> 15/07/27 10:22:46 WARN AkkaRpcEndpointRef: Error sending message [message = RemoveBroadcast(2,true)] in 1 attempts
> akka.pattern.AskTimeoutException: Recipient[Actorakka://sparkDriver/user/BlockManagerMaster#2011779764] had already been terminated.
> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132)
> at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:299)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
> at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:127)
> at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
> at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
> at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)
> at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:214)
> at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:170)
> at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:161)
> at scala.Option.foreach(Option.scala:236)
> at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:161)
> at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215)
> at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
> at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
> 15/07/27 10:22:46 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
> 15/07/27 10:22:49 WARN AkkaRpcEndpointRef: Error sending message [message = RemoveBroadcast(2,true)] in 2 attempts
> akka.pattern.AskTimeoutException: Recipient[Actorakka://sparkDriver/user/BlockManagerMaster#2011779764] had already been terminated.
> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132)
> at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:299)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
> at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:127)
> at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
> at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
> at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)
> at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:214)
> at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:170)
> at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:161)
> at scala.Option.foreach(Option.scala:236)
> at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:161)
> at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215)
> at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
> at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
> 15/07/27 10:22:52 WARN AkkaRpcEndpointRef: Error sending message [message = RemoveBroadcast(2,true)] in 3 attempts
> akka.pattern.AskTimeoutException: Recipient[Actorakka://sparkDriver/user/BlockManagerMaster#2011779764] had already been terminated.
> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132)
> at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:299)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
> at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:127)
> at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
> at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
> at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)
> at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:214)
> at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:170)
> at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:161)
> at scala.Option.foreach(Option.scala:236)
> at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:161)
> at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215)
> at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
> at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
> 15/07/27 10:22:55 ERROR ContextCleaner: Error cleaning broadcast 2
> org.apache.spark.SparkException: Error sending message [message = RemoveBroadcast(2,true)]
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
> at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:127)
> at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
> at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
> at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)
> at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:214)
> at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:170)
> at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:161)
> at scala.Option.foreach(Option.scala:236)
> at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:161)
> at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215)
> at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
> at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
> Caused by: akka.pattern.AskTimeoutException: Recipient[Actorakka://sparkDriver/user/BlockManagerMaster#2011779764] had already been terminated.
> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132)
> at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:299)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
> ... 13 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org