You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tinkerpop.apache.org by Marko Rodriguez <ok...@gmail.com> on 2016/01/29 22:08:48 UTC
Ruminations on SparkGraphComputer at Scale
Hello,
So this week, during TinkerPop's code freeze, Kuppitz and I have been stress testing SparkGraphComputer on a 4 Blade cluster using the Friendster dataset (125M vertices and 2.5B edges).
This is a list of things we learned and fixed.
First, Daniel Kuppitz wrote this really helpful script that gave us a huge boost in testing time turn arounds. It does the following:
1. Pulls the latest from git://.
2. Builds the code (being smart to delete grapes!)
3. :install hadoop/spark plugins into the Gremlin console.
4. distribute HADOOP_GREMLIN_LIBS jars to all SparkServer nodes in the cluster.
5. restarts the Spark cluster.
6. plops you at the console ready to rock.
Summary: here are the runtimes for g.V().count() over SparkGraphComputer's life time with Friendster.
- TinkerPop 3.0.0.MX: 2.5 hours
- TinkerPop 3.0.0: 1.5 hours
- TinkerPop 3.1.1: 23 minutes
*** Of course, this is NOT good for g.V().count() in general, but realize we are loading the entire graph, even though we only need vertices (no edge or properties).
https://issues.apache.org/jira/browse/TINKERPOP-962 (g.V().count() should really only take ~5 minutes)
For g.V().out().out().out().count() over Friendster:
- TinkerPop 3.0.0.MX: 12.8 hours
- TinkerPop 3.0.0: 8.6 hours
- TinkerPop 3.1.1: 2.4 hours
Answer: 215664338057221 (thats 215 trillion length 3 paths in Friendster)
1. Its all about GC control.
- Make many workers (we have 5 per machine) each with relatively small heaps (10 gigs each).
- The massive heap and store everything in memory model doesn't work -- it just leads to GC stalls.
2. Make use of TinkerPop's new gremlin.spark.graphStorageLevel (default is MEMORY_ONLY).
- I like DISK_ONLY as the whole RDD is cached in the cluster's file system.
- No fetching back to HDFS for data (especially when you are using ScriptInputFormat which is expensive!)
- And you definitely don't want to go back to the graph database and stress it needlessly.
- I don't like DISK_AND_MEM unless you know your whole graph will fit in memory. Once you have to start swapping things out of memory, GC.
- If you have lots and lots of workers (a big cluster), then DISK_AND_MEM might be good. Be cool if someone tested it.
- DISK_ONLY is a lot like Hadoop. Streaming in records at a time from the disk (its fast).
3. I had an insane-o bug in our Combiner implementation. < MAX_AMOUNT should have been <= MAX_AMOUNT.
- For g.V().count(), I went from shuffling 500M of data over the network to 6.4K.
- The job sped up by 30 minutes after that fix.
- Again, the main reason it was so slow, GC. 500mb stream of long payloads reduced to a single machine. Moron.
- Its a really bad idea to NOT use combiners for both VertexProgram and MapReduce.
- This is like the difference between working and not working.
- Also, this is what scares me about path-based traversers (match(), as(), path(), etc.). They can't be bulk'd easily.
- We will get smart here though. I have some inklings.
4. Its an absolute must that you partition your graph once loaded.
- Once the graph is loaded (graphDB, HDFS, etc.), the Spark partitioner "organizes" the graph around the cluster and then persists it.
- For Friendster, this takes about 15 minutes (w/ ScriptInputFormat as the read from HDFS parser).
- This is important because the loaded graphRDD is static and just gets a view propagated through it at each iteration. You don't want to keep shuffling this monster on each iteration.
- This is also why PersistedXXXRDD is crucial. If you are going to keep running jobs on the same data, the RDD being reused is already partitioned for you! (tada)
- For graph system providers, if you provide an InputRDD and you have a partitioner for it, that is a huge savings for SparkGraphComputer. So smart to do so.
- By partitioning upfront, I was able to reduce the shuffle load from ~22GB to ~2GB per vertex program iteration on Friendster. Insane.
- I was a fool before. I now know how to read Spark logs :) which is probably a good thing for me to know.
- This is so important that we now just do it automatically.
- However, if the data source and the Spark cluster are already "pair partitioned" we don't repartition! (elegant).
- http://tinkerpop.incubator.apache.org/docs/3.1.1-SNAPSHOT/reference/#sparkgraphcomputer (scroll down to the "InputRDD and OutputRDD"-section).
5. The graph data message/view shuffle is a lot of data. Make use of lots of TinkerPop workers() to reduce spills to disk.
- TinkerPop 3.1.0 introduced GraphComputer.workers(). In SparkGraphComputer, this is the number of partitions in the RDD.
- For Friendster, ScriptInputFormat gives me 229 partitions and g.V().count() takes 48 minutes.
- If I 5x this to 1145 using "workers(1145)", g.V().count() takes 25 minutes.
- Thats a 2x speed up but just chopping the data into finer slices.
- However, for 2290 workers, g.V().count() only gets marginally better -- 23 minutes.
- This is all about not spilling to disk and not getting GC all up in it.
- Now imagine if the graph provider's InputRDD already has a partitioner -- you are looking at ~10 minutes to g.V().count() Friendster (or like 1 minute if we don't load edges)!
5. I think we need to make Gryo more efficient. I don't think our serialization is optimal :/. Data seems over sized for what it is. This is all assumptions right now.
- I also use JavaSerializer for tinkerpop.Payload data and given that that is a significant chunk of what is shuffled --- it might be more that than Gryo. :|
- https://issues.apache.org/jira/browse/TINKERPOP-1110
6. There is one last area of the implementation that I think could be improved. But besides that (and minor "use less objects"-style optimizations), I think SparkGraphComputer *is* how it should be.
- https://issues.apache.org/jira/browse/TINKERPOP-1108
- If you are a Spark expert, please do review the code and provide feedback. Its really not that much code.
- https://github.com/apache/incubator-tinkerpop/blob/09a5d288c4143f2853386ce908c82d9ced3c30e7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
- https://github.com/apache/incubator-tinkerpop/blob/09a5d288c4143f2853386ce908c82d9ced3c30e7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
Anywho…. thats that. TinkerPop 3.1.1 will be sweeeeet.
Enjoy!,
Marko.
http://markorodriguez.com
Re: Ruminations on SparkGraphComputer at Scale
Posted by HadoopMarc <m....@xs4all.nl>.
Hi Marko,
Thanks for your enthousiastic and useful report! We had similiar
experiences over here. SparkGraphcomputer seems to like small chunks of
data of 128MB or so, even if you have 8 or 16 Gb in your executors.
In addition, when running Spark/Yarn, you need a high spark.yarn.executor.memoryOverhead
value of about 20%, while 6-10% is mentioned in the SparkYarn reference
https://spark.apache.org/docs/1.5.2/running-on-yarn.html .
<https://spark.apache.org/docs/1.5.2/running-on-yarn.html>
Otherwise, the executor starves when Yarn is set to police queues.
I am sorry I cannot provide any quantative data, but I thought I'd mention
it anyway, to give people a hint which knobs to tune.
Cheers, Marc