You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/06/03 23:15:23 UTC

git commit: Synthetic GraphX Benchmark

Repository: spark
Updated Branches:
  refs/heads/master aa41a522d -> 894ecde04


Synthetic GraphX Benchmark

This PR accomplishes two things:

1. It introduces a Synthetic Benchmark application that generates an arbitrarily large log-normal graph and executes either PageRank or connected components on the graph.  This can be used to profile GraphX system on arbitrary clusters without access to large graph datasets

2. This PR improves the implementation of the log-normal graph generator.

Author: Joseph E. Gonzalez <jo...@gmail.com>
Author: Ankur Dave <an...@gmail.com>

Closes #720 from jegonzal/graphx_synth_benchmark and squashes the following commits:

e40812a [Ankur Dave] Exclude all of GraphX from compatibility checks vs. 1.0.0
bccccad [Ankur Dave] Fix long lines
374678a [Ankur Dave] Bugfix and style changes
1bdf39a [Joseph E. Gonzalez] updating options
d943972 [Joseph E. Gonzalez] moving the benchmark application into the examples folder.
f4f839a [Joseph E. Gonzalez] Creating a synthetic benchmark script.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/894ecde0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/894ecde0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/894ecde0

Branch: refs/heads/master
Commit: 894ecde04faa7e2054a40825a58b2e9cdaa93c70
Parents: aa41a52
Author: Joseph E. Gonzalez <jo...@gmail.com>
Authored: Tue Jun 3 14:14:48 2014 -0700
Committer: Ankur Dave <an...@gmail.com>
Committed: Tue Jun 3 14:14:48 2014 -0700

----------------------------------------------------------------------
 .../spark/examples/graphx/SynthBenchmark.scala  | 128 +++++++++++++++++++
 .../apache/spark/graphx/PartitionStrategy.scala |   9 ++
 .../spark/graphx/util/GraphGenerators.scala     |  41 ++++--
 project/MimaExcludes.scala                      |   4 +-
 4 files changed, 171 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/894ecde0/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
new file mode 100644
index 0000000..551c339
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.graphx
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.graphx.PartitionStrategy
+import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.graphx.util.GraphGenerators
+import java.io.{PrintWriter, FileOutputStream}
+
+/**
+ * The SynthBenchmark application can be used to run various GraphX algorithms on
+ * synthetic log-normal graphs.  The intent of this code is to enable users to
+ * profile the GraphX system without access to large graph datasets.
+ */
+object SynthBenchmark {
+
+  /**
+   * To run this program use the following:
+   *
+   * MASTER=spark://foobar bin/run-example graphx.SynthBenchmark -app=pagerank
+   *
+   * Options:
+   *   -app "pagerank" or "cc" for pagerank or connected components. (Default: pagerank)
+   *   -niters the number of iterations of pagerank to use (Default: 10)
+   *   -numVertices the number of vertices in the graph (Default: 1000000)
+   *   -numEPart the number of edge partitions in the graph (Default: number of cores)
+   *   -partStrategy the graph partitioning strategy to use
+   *   -mu the mean parameter for the log-normal graph (Default: 4.0)
+   *   -sigma the stdev parameter for the log-normal graph (Default: 1.3)
+   *   -degFile the local file to save the degree information (Default: Empty)
+   */
+  def main(args: Array[String]) {
+    val options = args.map {
+      arg =>
+        arg.dropWhile(_ == '-').split('=') match {
+          case Array(opt, v) => (opt -> v)
+          case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
+        }
+    }
+
+    var app = "pagerank"
+    var niter = 10
+    var numVertices = 100000
+    var numEPart: Option[Int] = None
+    var partitionStrategy: Option[PartitionStrategy] = None
+    var mu: Double = 4.0
+    var sigma: Double = 1.3
+    var degFile: String = ""
+
+    options.foreach {
+      case ("app", v) => app = v
+      case ("niter", v) => niter = v.toInt
+      case ("nverts", v) => numVertices = v.toInt
+      case ("numEPart", v) => numEPart = Some(v.toInt)
+      case ("partStrategy", v) => partitionStrategy = Some(PartitionStrategy.fromString(v))
+      case ("mu", v) => mu = v.toDouble
+      case ("sigma", v) => sigma = v.toDouble
+      case ("degFile", v) => degFile = v
+      case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+    }
+
+    val conf = new SparkConf()
+      .setAppName(s"GraphX Synth Benchmark (nverts = $numVertices, app = $app)")
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
+
+    val sc = new SparkContext(conf)
+
+    // Create the graph
+    println(s"Creating graph...")
+    val unpartitionedGraph = GraphGenerators.logNormalGraph(sc, numVertices,
+      numEPart.getOrElse(sc.defaultParallelism), mu, sigma)
+    // Repartition the graph
+    val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)).cache()
+
+    var startTime = System.currentTimeMillis()
+    val numEdges = graph.edges.count()
+    println(s"Done creating graph. Num Vertices = $numVertices, Num Edges = $numEdges")
+    val loadTime = System.currentTimeMillis() - startTime
+
+    // Collect the degree distribution (if desired)
+    if (!degFile.isEmpty) {
+      val fos = new FileOutputStream(degFile)
+      val pos = new PrintWriter(fos)
+      val hist = graph.vertices.leftJoin(graph.degrees)((id, _, optDeg) => optDeg.getOrElse(0))
+        .map(p => p._2).countByValue()
+      hist.foreach {
+        case (deg, count) => pos.println(s"$deg \t $count")
+      }
+    }
+
+    // Run PageRank
+    startTime = System.currentTimeMillis()
+    if (app == "pagerank") {
+      println("Running PageRank")
+      val totalPR = graph.staticPageRank(niter).vertices.map(_._2).sum()
+      println(s"Total PageRank = $totalPR")
+    } else if (app == "cc") {
+      println("Running Connected Components")
+      val numComponents = graph.connectedComponents.vertices.map(_._2).distinct()
+      println(s"Number of components = $numComponents")
+    }
+    val runTime = System.currentTimeMillis() - startTime
+
+    println(s"Num Vertices = $numVertices")
+    println(s"Num Edges = $numEdges")
+    println(s"Creation time = ${loadTime/1000.0} seconds")
+    println(s"Run time = ${runTime/1000.0} seconds")
+
+    sc.stop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/894ecde0/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index 1526cce..ef412cf 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -119,4 +119,13 @@ object PartitionStrategy {
       math.abs((lower, higher).hashCode()) % numParts
     }
   }
+
+  /** Returns the PartitionStrategy with the specified name. */
+  def fromString(s: String): PartitionStrategy = s match {
+    case "RandomVertexCut" => RandomVertexCut
+    case "EdgePartition1D" => EdgePartition1D
+    case "EdgePartition2D" => EdgePartition2D
+    case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut
+    case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + s)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/894ecde0/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
index a3c8de3..635514f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
@@ -38,19 +38,42 @@ object GraphGenerators {
   val RMATa = 0.45
   val RMATb = 0.15
   val RMATd = 0.25
+
   /**
    * Generate a graph whose vertex out degree is log normal.
+   *
+   * The default values for mu and sigma are taken from the Pregel paper:
+   *
+   * Grzegorz Malewicz, Matthew H. Austern, Aart J.C Bik, James C. Dehnert,
+   * Ilan Horn, Naty Leiser, and Grzegorz Czajkowski. 2010.
+   * Pregel: a system for large-scale graph processing. SIGMOD '10.
+   *
+   * @param sc
+   * @param numVertices
+   * @param mu
+   * @param sigma
+   * @return
    */
-  def logNormalGraph(sc: SparkContext, numVertices: Int): Graph[Int, Int] = {
-    // based on Pregel settings
-    val mu = 4
-    val sigma = 1.3
-
-    val vertices: RDD[(VertexId, Int)] = sc.parallelize(0 until numVertices).map{
-      src => (src, sampleLogNormal(mu, sigma, numVertices))
+  def logNormalGraph(sc: SparkContext, numVertices: Int, numEParts: Int,
+                     mu: Double = 4.0, sigma: Double = 1.3): Graph[Long, Int] = {
+    val vertices = sc.parallelize(0 until numVertices, numEParts).map { src =>
+      // Initialize the random number generator with the source vertex id
+      val rand = new Random(src)
+      val degree = math.min(numVertices.toLong, math.exp(rand.nextGaussian() * sigma + mu).toLong)
+      (src.toLong, degree)
     }
-    val edges = vertices.flatMap { v =>
-      generateRandomEdges(v._1.toInt, v._2, numVertices)
+    val edges = vertices.flatMap { case (src, degree) =>
+      new Iterator[Edge[Int]] {
+        // Initialize the random number generator with the source vertex id
+        val rand = new Random(src)
+        var i = 0
+        override def hasNext(): Boolean = { i < degree }
+        override def next(): Edge[Int] = {
+          val nextEdge = Edge[Int](src, rand.nextInt(numVertices), i)
+          i += 1
+          nextEdge
+        }
+      }
     }
     Graph(vertices, edges, 0)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/894ecde0/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index ecb389d..fc9cbea 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -35,7 +35,8 @@ object MimaExcludes {
     val excludes =
       SparkBuild.SPARK_VERSION match {
         case v if v.startsWith("1.1") =>
-          Seq()
+          Seq(
+            MimaBuild.excludeSparkPackage("graphx"))
         case v if v.startsWith("1.0") =>
           Seq(
             MimaBuild.excludeSparkPackage("api.java"),
@@ -58,4 +59,3 @@ object MimaExcludes {
         case _ => Seq()
       }
 }
-