You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tinkerpop.apache.org by Marko Rodriguez <ok...@gmail.com> on 2016/01/29 22:08:48 UTC

Ruminations on SparkGraphComputer at Scale

Hello,

So this week, during TinkerPop's code freeze, Kuppitz and I have been stress testing SparkGraphComputer on a 4 Blade cluster using the Friendster dataset (125M vertices and 2.5B edges).

This is a list of things we learned and fixed.

	First, Daniel Kuppitz wrote this really helpful script that gave us a huge boost in testing time turn arounds. It does the following:
		1. Pulls the latest from git://.
		2. Builds the code (being smart to delete grapes!)
		3. :install hadoop/spark plugins into the Gremlin console.
		4. distribute HADOOP_GREMLIN_LIBS jars to all SparkServer nodes in the cluster.
		5. restarts the Spark cluster.
		6. plops you at the console ready to rock.

	Summary: here are the runtimes for g.V().count() over SparkGraphComputer's life time with Friendster.
		- TinkerPop 3.0.0.MX:  2.5 hours
		- TinkerPop 3.0.0:         1.5 hours
		- TinkerPop 3.1.1:         23 minutes

		*** Of course, this is NOT good for g.V().count() in general, but realize we are loading the entire graph, even though we only need vertices (no edge or properties).
			https://issues.apache.org/jira/browse/TINKERPOP-962 (g.V().count() should really only take ~5 minutes)

		For g.V().out().out().out().count() over Friendster:
			- TinkerPop 3.0.0.MX: 12.8 hours
			- TinkerPop 3.0.0:        8.6 hours
			- TinkerPop 3.1.1:        2.4 hours
		Answer: 215664338057221 (thats 215 trillion length 3 paths in Friendster)

	1. Its all about GC control. 
		- Make many workers (we have 5 per machine) each with relatively small heaps (10 gigs each).
		- The massive heap and store everything in memory model doesn't work -- it just leads to GC stalls.
	
	2. Make use of TinkerPop's new gremlin.spark.graphStorageLevel (default is MEMORY_ONLY).
		- I like DISK_ONLY as the whole RDD is cached in the cluster's file system. 
			- No fetching back to HDFS for data (especially when you are using ScriptInputFormat which is expensive!)
			- And you definitely don't want to go back to the graph database and stress it needlessly.
		- I don't like DISK_AND_MEM unless you know your whole graph will fit in memory. Once you have to start swapping things out of memory, GC.
			- If you have lots and lots of workers (a big cluster), then DISK_AND_MEM might be good. Be cool if someone tested it.	
		- DISK_ONLY is a lot like Hadoop. Streaming in records at a time from the disk (its fast).

	3. I had an insane-o bug in our Combiner implementation. < MAX_AMOUNT should have been <= MAX_AMOUNT.
		- For g.V().count(), I went from shuffling 500M of data over the network to 6.4K.
		- The job sped up by 30 minutes after that fix.
			- Again, the main reason it was so slow, GC. 500mb stream of long payloads reduced to a single machine. Moron.
		- Its a really bad idea to NOT use combiners for both VertexProgram and MapReduce. 
			- This is like the difference between working and not working.
			- Also, this is what scares me about path-based traversers (match(), as(), path(), etc.). They can't be bulk'd easily.
				- We will get smart here though. I have some inklings.
	
	4. Its an absolute must that you partition your graph once loaded.
		- Once the graph is loaded (graphDB, HDFS, etc.), the Spark partitioner "organizes" the graph around the cluster and then persists it.
			- For Friendster, this takes about 15 minutes (w/ ScriptInputFormat as the read from HDFS parser).
			- This is important because the loaded graphRDD is static and just gets a view propagated through it at each iteration. You don't want to keep shuffling this monster on each iteration.
			- This is also why PersistedXXXRDD is crucial. If you are going to keep running jobs on the same data, the RDD being reused is already partitioned for you! (tada)
				- For graph system providers, if you provide an InputRDD and you have a partitioner for it, that is a huge savings for SparkGraphComputer. So smart to do so.
		- By partitioning upfront, I was able to reduce the shuffle load from ~22GB to ~2GB per vertex program iteration on Friendster. Insane.
			- I was a fool before. I now know how to read Spark logs :) which is probably a good thing for me to know.
		- This is so important that we now just do it automatically.
			- However, if the data source and the Spark cluster are already "pair partitioned" we don't repartition! (elegant).
			- http://tinkerpop.incubator.apache.org/docs/3.1.1-SNAPSHOT/reference/#sparkgraphcomputer (scroll down to the "InputRDD and OutputRDD"-section).
	
	5. The graph data message/view shuffle is a lot of data. Make use of lots of TinkerPop workers() to reduce spills to disk.
		- TinkerPop 3.1.0 introduced GraphComputer.workers(). In SparkGraphComputer, this is the number of partitions in the RDD.
		- For Friendster, ScriptInputFormat gives me 229 partitions and g.V().count() takes 48 minutes.
 			- If I 5x this to 1145 using "workers(1145)", g.V().count() takes 25 minutes. 
			- Thats a 2x speed up but just chopping the data into finer slices.
			- However, for 2290 workers, g.V().count() only gets marginally better -- 23 minutes.
		- This is all about not spilling to disk and not getting GC all up in it.	
		- Now imagine if the graph provider's InputRDD already has a partitioner -- you are looking at ~10 minutes to g.V().count() Friendster (or like 1 minute if we don't load edges)!		

	5. I think we need to make Gryo more efficient. I don't think our serialization is optimal :/. Data seems over sized for what it is. This is all assumptions right now.
		- I also use JavaSerializer for tinkerpop.Payload data and given that that is a significant chunk of what is shuffled --- it might be more that than Gryo. :|
			- https://issues.apache.org/jira/browse/TINKERPOP-1110

	6. There is one last area of the implementation that I think could be improved. But besides that (and minor "use less objects"-style optimizations), I think SparkGraphComputer *is* how it should be.
		- https://issues.apache.org/jira/browse/TINKERPOP-1108
		- If you are a Spark expert, please do review the code and provide feedback. Its really not that much code.
			- https://github.com/apache/incubator-tinkerpop/blob/09a5d288c4143f2853386ce908c82d9ced3c30e7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
			- https://github.com/apache/incubator-tinkerpop/blob/09a5d288c4143f2853386ce908c82d9ced3c30e7/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java

Anywho…. thats that. TinkerPop 3.1.1 will be sweeeeet.

Enjoy!,
Marko.

http://markorodriguez.com


Re: Ruminations on SparkGraphComputer at Scale

Posted by HadoopMarc <m....@xs4all.nl>.
Hi Marko,

Thanks for your enthousiastic and useful report! We had similiar 
experiences over here. SparkGraphcomputer seems to like small chunks of 
data of 128MB or so, even if you have 8 or 16 Gb in your executors.

In addition, when running Spark/Yarn, you need a high spark.yarn.executor.memoryOverhead 
value of about 20%, while 6-10% is mentioned in the SparkYarn reference 
https://spark.apache.org/docs/1.5.2/running-on-yarn.html . 
<https://spark.apache.org/docs/1.5.2/running-on-yarn.html>
Otherwise, the executor starves when Yarn is set to police queues.
I am sorry I cannot provide any quantative data, but I thought I'd mention 
it anyway, to give people a hint which knobs to tune.

Cheers,     Marc