You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Roman Sokolov <ol...@gmail.com> on 2015/07/11 03:58:20 UTC

Re: Spark GraphX memory requirements + java.lang.OutOfMemoryError: GC overhead limit exceeded

Hello again.
So I could compute triangle numbers when run the code from spark shell
without workers (with --driver-memory 15g option), but with workers I have
errors. So I run spark shell:
./bin/spark-shell --master spark://192.168.0.31:7077 --executor-memory
6900m --driver-memory 15g
and workers (by hands):
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://
192.168.0.31:7077
(2 workers, each has 8Gb RAM; master has 32 Gb RAM).

The code now is:
import org.apache.spark._
import org.apache.spark.graphx._
val graph = GraphLoader.edgeListFile(sc,
"/home/data/graph.txt").partitionBy(PartitionStrategy.RandomVertexCut)
val newgraph = graph.convertToCanonicalEdges()
val triangleNum = newgraph.triangleCount().vertices.map(x =>
x._2.toLong).reduce(_ + _)/3

So how to understand what amount of memory is needed? And why I need it so
much? Dataset is only 1,1Gb small...

Error:
[Stage 7:>                                                         (0 + 8)
/ 32]15/07/11 01:48:45 WARN TaskSetManager: Lost task 2.0 in stage 7.0 (TID
130, 192.168.0.28): io.netty.handler.codec.DecoderException:
java.lang.OutOfMemoryError
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:153)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError
at sun.misc.Unsafe.allocateMemory(Native Method)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
at
io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:440)
at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:187)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:165)
at io.netty.buffer.PoolArena.reallocate(PoolArena.java:277)
at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:108)
at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:146)
... 10 more


On 26 June 2015 at 14:06, Roman Sokolov <ol...@gmail.com> wrote:

> Yep, I already found it. So I added 1 line:
>
> val graph = GraphLoader.edgeListFile(sc, "....", ...)
> val newgraph = graph.convertToCanonicalEdges()
>
> and could successfully count triangles on "newgraph". Next will test it on
> bigger (several Gb) networks.
>
> I am using Spark 1.3 and 1.4 but haven't seen this function in
> https://spark.apache.org/docs/latest/graphx-programming-guide.html
>
> Thanks a lot guys!
> Am 26.06.2015 13:50 schrieb "Ted Yu" <yu...@gmail.com>:
>
>> See SPARK-4917 which went into Spark 1.3.0
>>
>> On Fri, Jun 26, 2015 at 2:27 AM, Robin East <ro...@xense.co.uk>
>> wrote:
>>
>>> You’ll get this issue if you just take the first 2000 lines of that
>>> file. The problem is triangleCount() expects srdId < dstId which is not the
>>> case in the file (e.g. vertex 28). You can get round this by calling
>>> graph.convertToCanonical Edges() which removes bi-directional edges and
>>> ensures srcId < dstId. Which version of Spark are you on? Can’t remember
>>> what version that method was introduced in.
>>>
>>> Robin
>>>
>>> On 26 Jun 2015, at 09:44, Roman Sokolov <ol...@gmail.com> wrote:
>>>
>>> Ok, but what does it means? I did not change the core files of spark, so
>>> is it a bug there?
>>> PS: on small datasets (<500 Mb) I have no problem.
>>> Am 25.06.2015 18:02 schrieb "Ted Yu" <yu...@gmail.com>:
>>>
>>>> The assertion failure from TriangleCount.scala corresponds with the
>>>> following lines:
>>>>
>>>>     g.outerJoinVertices(counters) {
>>>>       (vid, _, optCounter: Option[Int]) =>
>>>>         val dblCount = optCounter.getOrElse(0)
>>>>         // double count should be even (divisible by two)
>>>>         assert((dblCount & 1) == 0)
>>>>
>>>> Cheers
>>>>
>>>> On Thu, Jun 25, 2015 at 6:20 AM, Roman Sokolov <ol...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello!
>>>>> I am trying to compute number of triangles with GraphX. But get memory
>>>>> error or heap size, even though the dataset is very small (1Gb). I run the
>>>>> code in spark-shell, having 16Gb RAM machine (also tried with 2 workers on
>>>>> separate machines 8Gb RAM each). So I have 15x more memory than the dataset
>>>>> size is, but it is not enough. What should I do with terabytes sized
>>>>> datasets? How do people process it? Read a lot of documentation and 2 Spark
>>>>> books, and still have no clue :(
>>>>>
>>>>> Tried to run with the options, no effect:
>>>>> ./bin/spark-shell --executor-memory 6g --driver-memory 9g
>>>>> --total-executor-cores 100
>>>>>
>>>>> The code is simple:
>>>>>
>>>>> val graph = GraphLoader.edgeListFile(sc,
>>>>> "/home/ubuntu/data/soc-LiveJournal1/lj.stdout",
>>>>> edgeStorageLevel = StorageLevel.MEMORY_AND_DISK_SER,
>>>>> vertexStorageLevel =
>>>>> StorageLevel.MEMORY_AND_DISK_SER).partitionBy(PartitionStrategy.RandomVertexCut)
>>>>>
>>>>> println(graph.numEdges)
>>>>> println(graph.numVertices)
>>>>>
>>>>> val triangleNum = graph.triangleCount().vertices.map(x =>
>>>>> x._2).reduce(_ + _)/3
>>>>>
>>>>> (dataset is from here:
>>>>> http://konect.uni-koblenz.de/downloads/tsv/soc-LiveJournal1.tar.bz2 first
>>>>> two lines contain % characters, so have to be removed).
>>>>>
>>>>>
>>>>> UPD: today tried on 32Gb machine (from spark shell again), now got
>>>>> another error:
>>>>>
>>>>> [Stage 8:>                                                         (0
>>>>> + 4) / 32]15/06/25 13:03:05 WARN ShippableVertexPartitionOps: Joining two
>>>>> VertexPartitions with different indexes is slow.
>>>>> 15/06/25 13:03:05 ERROR Executor: Exception in task 3.0 in stage 8.0
>>>>> (TID 227)
>>>>> java.lang.AssertionError: assertion failed
>>>>> at scala.Predef$.assert(Predef.scala:165)
>>>>> at
>>>>> org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:90)
>>>>> at
>>>>> org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:87)
>>>>> at
>>>>> org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:140)
>>>>> at
>>>>> org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:133)
>>>>> at
>>>>> org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:159)
>>>>> at
>>>>> org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:156)
>>>>> at
>>>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>> at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>> at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>>> at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best regards, Roman Sokolov
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>


-- 
Best regards, Roman Sokolov

AW: Spark GraphX memory requirements + java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by re...@nzz.ch.
Hi –

I'd like to follow up on this, as I am running into very similar issues (with a much bigger data set, though – 10^5 nodes, 10^7 edges).

So let me repost the question: Any ideas on how to estimate graphx memory requirements?

Cheers!

Von: Roman Sokolov [mailto:oleamm@gmail.com]
Gesendet: Samstag, 11. Juli 2015 03:58
An: Ted Yu; Robin East; user
Betreff: Re: Spark GraphX memory requirements + java.lang.OutOfMemoryError: GC overhead limit exceeded

Hello again.
So I could compute triangle numbers when run the code from spark shell without workers (with --driver-memory 15g option), but with workers I have errors. So I run spark shell:
./bin/spark-shell --master spark://192.168.0.31:7077<http://192.168.0.31:7077> --executor-memory 6900m --driver-memory 15g
and workers (by hands):
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://192.168.0.31:7077<http://192.168.0.31:7077>
(2 workers, each has 8Gb RAM; master has 32 Gb RAM).

The code now is:
import org.apache.spark._
import org.apache.spark.graphx._
val graph = GraphLoader.edgeListFile(sc, "/home/data/graph.txt").partitionBy(PartitionStrategy.RandomVertexCut)
val newgraph = graph.convertToCanonicalEdges()
val triangleNum = newgraph.triangleCount().vertices.map(x => x._2.toLong).reduce(_ + _)/3

So how to understand what amount of memory is needed? And why I need it so much? Dataset is only 1,1Gb small...

Error:
[Stage 7:>                                                         (0 + 8) / 32]15/07/11 01:48:45 WARN TaskSetManager: Lost task 2.0 in stage 7.0 (TID 130, 192.168.0.28): io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError
                at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:153)
                at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
                at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
                at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
                at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
                at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
                at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
                at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
                at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
                at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
                at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError
                at sun.misc.Unsafe.allocateMemory(Native Method)
                at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
                at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
                at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:440)
                at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:187)
                at io.netty.buffer.PoolArena.allocate(PoolArena.java:165)
                at io.netty.buffer.PoolArena.reallocate(PoolArena.java:277)
                at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:108)
                at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
                at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
                at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
                at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
                at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:146)
                ... 10 more


On 26 June 2015 at 14:06, Roman Sokolov <ol...@gmail.com>> wrote:

Yep, I already found it. So I added 1 line:

val graph = GraphLoader.edgeListFile(sc, "....", ...)
val newgraph = graph.convertToCanonicalEdges()

and could successfully count triangles on "newgraph". Next will test it on bigger (several Gb) networks.

I am using Spark 1.3 and 1.4 but haven't seen this function in https://spark.apache.org/docs/latest/graphx-programming-guide.html

Thanks a lot guys!
Am 26.06.2015 13:50 schrieb "Ted Yu" <yu...@gmail.com>>:
See SPARK-4917 which went into Spark 1.3.0

On Fri, Jun 26, 2015 at 2:27 AM, Robin East <ro...@xense.co.uk>> wrote:
You’ll get this issue if you just take the first 2000 lines of that file. The problem is triangleCount() expects srdId < dstId which is not the case in the file (e.g. vertex 28). You can get round this by calling graph.convertToCanonical Edges() which removes bi-directional edges and ensures srcId < dstId. Which version of Spark are you on? Can’t remember what version that method was introduced in.

Robin
On 26 Jun 2015, at 09:44, Roman Sokolov <ol...@gmail.com>> wrote:


Ok, but what does it means? I did not change the core files of spark, so is it a bug there?
PS: on small datasets (<500 Mb) I have no problem.
Am 25.06.2015 18:02 schrieb "Ted Yu" <yu...@gmail.com>>:
The assertion failure from TriangleCount.scala corresponds with the following lines:

    g.outerJoinVertices(counters) {
      (vid, _, optCounter: Option[Int]) =>
        val dblCount = optCounter.getOrElse(0)
        // double count should be even (divisible by two)
        assert((dblCount & 1) == 0)

Cheers

On Thu, Jun 25, 2015 at 6:20 AM, Roman Sokolov <ol...@gmail.com>> wrote:
Hello!
I am trying to compute number of triangles with GraphX. But get memory error or heap size, even though the dataset is very small (1Gb). I run the code in spark-shell, having 16Gb RAM machine (also tried with 2 workers on separate machines 8Gb RAM each). So I have 15x more memory than the dataset size is, but it is not enough. What should I do with terabytes sized datasets? How do people process it? Read a lot of documentation and 2 Spark books, and still have no clue :(

Tried to run with the options, no effect:
./bin/spark-shell --executor-memory 6g --driver-memory 9g --total-executor-cores 100

The code is simple:

val graph = GraphLoader.edgeListFile(sc, "/home/ubuntu/data/soc-LiveJournal1/lj.stdout",
edgeStorageLevel = StorageLevel.MEMORY_AND_DISK_SER,
vertexStorageLevel = StorageLevel.MEMORY_AND_DISK_SER).partitionBy(PartitionStrategy.RandomVertexCut)

println(graph.numEdges)
println(graph.numVertices)

val triangleNum = graph.triangleCount().vertices.map(x => x._2).reduce(_ + _)/3

(dataset is from here: http://konect.uni-koblenz.de/downloads/tsv/soc-LiveJournal1.tar.bz2 first two lines contain % characters, so have to be removed).


UPD: today tried on 32Gb machine (from spark shell again), now got another error:

[Stage 8:>                                                         (0 + 4) / 32]15/06/25 13:03:05 WARN ShippableVertexPartitionOps: Joining two VertexPartitions with different indexes is slow.
15/06/25 13:03:05 ERROR Executor: Exception in task 3.0 in stage 8.0 (TID 227)
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:165)
at org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:90)
at org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:87)
at org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:140)
at org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:133)
at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:159)
at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:156)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)






--
Best regards, Roman Sokolov







--
Best regards, Roman Sokolov