You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Yifan LI <ia...@gmail.com> on 2015/10/29 12:52:15 UTC

[Spark] java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

Hey,

I was just trying to scan a large RDD sortedRdd, ~1billion elements, using toLocalIterator api, but an exception returned as it was almost finished:

java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
	at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:821)
	at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
	at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
	at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
	at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
	at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
	at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
	at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
	at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
	at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
	at java.lang.Thread.run(Thread.java:745)

	at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:103)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Do you have any idea? 

I have set partitioning quite big, like 40000


Best,
Yifan LI






Re: [Spark] java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

Posted by Yifan LI <ia...@gmail.com>.
Thanks Deng,

Yes, I agree that there is a partition larger than 2GB which caused this
exception.

But actually in my case it seems to be not-helpful to fix this problem by
directly increasing partitioning in sortBy operation.

I think the partitioning in sortBy is not balanced, e.g. in my dataset,
there exist amounts of elements having same value, then they will be kept
in a common partition whose size would be quite large.

So, I called rePartition method for balanced partitioning after sorting and
it works well.

On 30 October 2015 at 03:13, Deng Ching-Mallete <oc...@apache.org> wrote:

> Hi Yifan,
>
> This is a known issue, please refer to
> https://issues.apache.org/jira/browse/SPARK-6235 for more details. In
> your case, it looks like you are caching to disk a partition > 2G. A
> workaround would be to increase the number of your RDD partitions in order
> to make them smaller in size.
>
> HTH,
> Deng
>
> On Thu, Oct 29, 2015 at 8:40 PM, Yifan LI <ia...@gmail.com> wrote:
>
>> I have a guess that before scanning that RDD, I sorted it and set
>> partitioning, so the result is not balanced:
>>
>> sortBy[S](f: Function
>> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/api/java/function/Function.html>
>> [T, S], ascending: Boolean, *numPartitions*: Int)
>>
>> I will try to repartition it to see if it helps.
>>
>> Best,
>> Yifan LI
>>
>>
>>
>>
>>
>> On 29 Oct 2015, at 12:52, Yifan LI <ia...@gmail.com> wrote:
>>
>> Hey,
>>
>> I was just trying to scan a large RDD sortedRdd, ~1billion elements,
>> using toLocalIterator api, but an exception returned as it was almost
>> finished:
>>
>> java.lang.RuntimeException: java.lang.IllegalArgumentException: Size
>> exceeds Integer.MAX_VALUE
>> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:821)
>> at
>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
>> at
>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
>> at
>> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
>> at
>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
>> at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>> at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> at
>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>> at
>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>> at
>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>> at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>> at
>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>> at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>> at
>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>> at
>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>> at
>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> at
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> at
>> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:103)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>> at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>> at
>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>> at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>> at
>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>> at
>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>> at
>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> at
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>> Do you have any idea?
>>
>> I have set partitioning quite big, like 40000
>>
>>
>> Best,
>> Yifan LI
>>
>>
>>
>

Re: [Spark] java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

Posted by Deng Ching-Mallete <oc...@apache.org>.
Hi Yifan,

This is a known issue, please refer to
https://issues.apache.org/jira/browse/SPARK-6235 for more details. In your
case, it looks like you are caching to disk a partition > 2G. A workaround
would be to increase the number of your RDD partitions in order to make
them smaller in size.

HTH,
Deng

On Thu, Oct 29, 2015 at 8:40 PM, Yifan LI <ia...@gmail.com> wrote:

> I have a guess that before scanning that RDD, I sorted it and set
> partitioning, so the result is not balanced:
>
> sortBy[S](f: Function
> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/api/java/function/Function.html>
> [T, S], ascending: Boolean, *numPartitions*: Int)
>
> I will try to repartition it to see if it helps.
>
> Best,
> Yifan LI
>
>
>
>
>
> On 29 Oct 2015, at 12:52, Yifan LI <ia...@gmail.com> wrote:
>
> Hey,
>
> I was just trying to scan a large RDD sortedRdd, ~1billion elements, using
> toLocalIterator api, but an exception returned as it was almost finished:
>
> java.lang.RuntimeException: java.lang.IllegalArgumentException: Size
> exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:821)
> at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
> at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
> at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
> at
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
> at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> at
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
> at
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
> at
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> at java.lang.Thread.run(Thread.java:745)
>
> at
> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:103)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> Do you have any idea?
>
> I have set partitioning quite big, like 40000
>
>
> Best,
> Yifan LI
>
>
>

Re: [Spark] java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

Posted by Yifan LI <ia...@gmail.com>.
I have a guess that before scanning that RDD, I sorted it and set partitioning, so the result is not balanced:

sortBy[S](f: Function <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/api/java/function/Function.html>[T, S], ascending: Boolean, numPartitions: Int)

I will try to repartition it to see if it helps.

Best,
Yifan LI





> On 29 Oct 2015, at 12:52, Yifan LI <ia...@gmail.com> wrote:
> 
> Hey,
> 
> I was just trying to scan a large RDD sortedRdd, ~1billion elements, using toLocalIterator api, but an exception returned as it was almost finished:
> 
> java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> 	at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:821)
> 	at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
> 	at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
> 	at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
> 	at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
> 	at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
> 	at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
> 	at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> 	at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> 	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> 	at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
> 	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
> 	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
> 	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
> 	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
> 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> 	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> 	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> 	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> 	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
> 	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> 	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> 	at java.lang.Thread.run(Thread.java:745)
> 
> 	at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162)
> 	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:103)
> 	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
> 	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> 	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> 	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> 	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> 	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
> 	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> 	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> 	at java.lang.Thread.run(Thread.java:745)
> 
> Driver stacktrace:
> 	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> 	at scala.Option.foreach(Option.scala:236)
> 	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
> 	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 
> Do you have any idea? 
> 
> I have set partitioning quite big, like 40000
> 
> 
> Best,
> Yifan LI
> 
> 
> 
> 
>