You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/04/13 20:29:12 UTC

[2/2] flink git commit: [FLINK-2909] [gelly] Graph Generators

[FLINK-2909] [gelly] Graph Generators

Initial set of scale-free graph generators:
- Complete graph
- Cycle graph
- Empty graph
- Grid graph
- Hypercube graph
- Path graph
- RMat graph
- Singleton edge graph
- Star graph

This closes #1807


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0a7a1b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0a7a1b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0a7a1b8

Branch: refs/heads/master
Commit: b0a7a1b813b29dbfd5c959ee2929372c31befda8
Parents: 5350bc4
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Mar 15 16:59:02 2016 -0400
Committer: EC2 Default User <ec...@ip-10-0-35-26.ec2.internal>
Committed: Wed Apr 13 18:25:29 2016 +0000

----------------------------------------------------------------------
 docs/apis/batch/libs/gelly.md                   | 644 +++++++++++++++++++
 docs/page/css/flink.css                         |  21 +
 .../flink/util/LongValueSequenceIterator.java   | 193 ++++++
 .../util/LongValueSequenceIteratorTest.java     |  92 +++
 .../java/org/apache/flink/api/java/Utils.java   |   2 +-
 .../apache/flink/graph/examples/Graph500.java   | 121 ++++
 .../graph/generator/AbstractGraphGenerator.java |  33 +
 .../flink/graph/generator/CompleteGraph.java    | 112 ++++
 .../flink/graph/generator/CycleGraph.java       |  60 ++
 .../flink/graph/generator/EmptyGraph.java       |  79 +++
 .../flink/graph/generator/GraphGenerator.java   |  51 ++
 .../graph/generator/GraphGeneratorUtils.java    | 119 ++++
 .../apache/flink/graph/generator/GridGraph.java | 165 +++++
 .../flink/graph/generator/HypercubeGraph.java   |  65 ++
 .../apache/flink/graph/generator/PathGraph.java |  60 ++
 .../apache/flink/graph/generator/RMatGraph.java | 316 +++++++++
 .../graph/generator/SingletonEdgeGraph.java     | 107 +++
 .../apache/flink/graph/generator/StarGraph.java | 100 +++
 .../random/AbstractGeneratorFactory.java        |  72 +++
 .../flink/graph/generator/random/BlockInfo.java |  82 +++
 .../random/JDKRandomGeneratorFactory.java       |  72 +++
 .../random/MersenneTwisterFactory.java          |  72 +++
 .../graph/generator/random/RandomGenerable.java |  41 ++
 .../random/RandomGenerableFactory.java          |  57 ++
 .../graph/generator/AbstractGraphTest.java      |  33 +
 .../graph/generator/CompleteGraphTest.java      |  84 +++
 .../flink/graph/generator/CycleGraphTest.java   |  83 +++
 .../flink/graph/generator/EmptyGraphTest.java   |  78 +++
 .../flink/graph/generator/GridGraphTest.java    |  93 +++
 .../graph/generator/HypercubeGraphTest.java     |  85 +++
 .../flink/graph/generator/PathGraphTest.java    |  83 +++
 .../flink/graph/generator/RMatGraphTest.java    |  70 ++
 .../graph/generator/SingletonEdgeGraphTest.java |  84 +++
 .../flink/graph/generator/StarGraphTest.java    |  85 +++
 .../apache/flink/graph/generator/TestUtils.java | 103 +++
 35 files changed, 3616 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index d803be2..53d628d 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -2012,3 +2012,647 @@ vertex represents a group of vertices and each edge represents a group of edges
 vertex and edge in the output graph stores the common group value and the number of represented elements.
 
 {% top %}
+
+Graph Generators
+-----------
+
+Gelly provides a collection of scalable graph generators. Each generator is
+
+* parallelizable, in order to create large datasets
+* scale-free, generating the same graph regardless of parallelism
+* thrifty, using as few operators as possible
+
+Graph generators are configured using the builder pattern. The parallelism of generator
+operators can be set explicitly by calling `setParallelism(parallelism)`. Lowering the
+parallelism will reduce the allocation of memory and network buffers.
+
+Graph-specific configuration must be called first, then configuration common to all
+generators, and lastly the call to `generate()`. The following example configures a
+grid graph with two dimensions, configures the parallelism, and generates the graph.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+boolean wrapEndpoints = false;
+
+int parallelism = 4;
+
+Graph<LongValue,NullValue,NullValue> graph = new GridGraph(env)
+    .addDimension(2, wrapEndpoints)
+    .addDimension(4, wrapEndpoints)
+    .setParallelism(parallelism)
+    .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.GridGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+wrapEndpoints = false
+
+val parallelism = 4
+
+val graph = new GridGraph(env.getJavaEnv).addDimension(2, wrapEndpoints).addDimension(4, wrapEndpoints).setParallelism(parallelism).generate()
+{% endhighlight %}
+</div>
+</div>
+
+### Provided graph generators
+
+* [Complete Graph](#complete-graph)
+* [Cycle Graph](#cycle-graph)
+* [Empty Graph](#empty-graph)
+* [Grid Graph](#grid-graph)
+* [Hypercube Graph](#hypercube-graph)
+* [Path Graph](#path-graph)
+* [RMat Graph](#rmat-graph)
+* [Singleton Edge Graph](#singleton-edge-graph)
+* [Star Graph](#star-graph)
+
+### Complete Graph
+
+An undirected graph connecting every distinct pair of vertices.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+long vertexCount = 5;
+
+Graph<LongValue,NullValue,NullValue> graph = new CompleteGraph(env, vertexCount)
+    .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.CompleteGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexCount = 5
+
+val graph = new CompleteGraph(env.getJavaEnv, vertexCount).generate()
+{% endhighlight %}
+</div>
+</div>
+
+<svg class="graph" width="540" height="540"
+    xmlns="http://www.w3.org/2000/svg"
+    xmlns:xlink="http://www.w3.org/1999/xlink">
+
+    <line x1="270" y1="40" x2="489" y2="199" />
+    <line x1="270" y1="40" x2="405" y2="456" />
+    <line x1="270" y1="40" x2="135" y2="456" />
+    <line x1="270" y1="40" x2="51" y2="199" />
+
+    <line x1="489" y1="199" x2="405" y2="456" />
+    <line x1="489" y1="199" x2="135" y2="456" />
+    <line x1="489" y1="199" x2="51" y2="199" />
+
+    <line x1="405" y1="456" x2="135" y2="456" />
+    <line x1="405" y1="456" x2="51" y2="199" />
+
+    <line x1="135" y1="456" x2="51" y2="199" />
+
+    <circle cx="270" cy="40" r="20" />
+    <text x="270" y="40">0</text>
+
+    <circle cx="489" cy="199" r="20" />
+    <text x="489" y="199">1</text>
+
+    <circle cx="405" cy="456" r="20" />
+    <text x="405" y="456">2</text>
+
+    <circle cx="135" cy="456" r="20" />
+    <text x="135" y="456">3</text>
+
+    <circle cx="51" cy="199" r="20" />
+    <text x="51" y="199">4</text>
+</svg>
+
+### Cycle Graph
+
+An undirected graph where all edges form a single cycle.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+long vertexCount = 5;
+
+Graph<LongValue,NullValue,NullValue> graph = new CycleGraph(env, vertexCount)
+    .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.CycleGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexCount = 5
+
+val graph = new CycleGraph(env.getJavaEnv, vertexCount).generate()
+{% endhighlight %}
+</div>
+</div>
+
+<svg class="graph" width="540" height="540"
+    xmlns="http://www.w3.org/2000/svg"
+    xmlns:xlink="http://www.w3.org/1999/xlink">
+
+    <line x1="270" y1="40" x2="489" y2="199" />
+    <line x1="489" y1="199" x2="405" y2="456" />
+    <line x1="405" y1="456" x2="135" y2="456" />
+    <line x1="135" y1="456" x2="51" y2="199" />
+    <line x1="51" y1="199" x2="270" y2="40" />
+
+    <circle cx="270" cy="40" r="20" />
+    <text x="270" y="40">0</text>
+
+    <circle cx="489" cy="199" r="20" />
+    <text x="489" y="199">1</text>
+
+    <circle cx="405" cy="456" r="20" />
+    <text x="405" y="456">2</text>
+
+    <circle cx="135" cy="456" r="20" />
+    <text x="135" y="456">3</text>
+
+    <circle cx="51" cy="199" r="20" />
+    <text x="51" y="199">4</text>
+</svg>
+
+### Empty Graph
+
+The graph containing no edges.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+long vertexCount = 5;
+
+Graph<LongValue,NullValue,NullValue> graph = new EmptyGraph(env, vertexCount)
+    .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.EmptyGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexCount = 5
+
+val graph = new EmptyGraph(env.getJavaEnv, vertexCount).generate()
+{% endhighlight %}
+</div>
+</div>
+
+<svg class="graph" width="540" height="80"
+    xmlns="http://www.w3.org/2000/svg"
+    xmlns:xlink="http://www.w3.org/1999/xlink">
+
+    <circle cx="30" cy="40" r="20" />
+    <text x="30" y="40">0</text>
+
+    <circle cx="150" cy="40" r="20" />
+    <text x="150" y="40">1</text>
+
+    <circle cx="270" cy="40" r="20" />
+    <text x="270" y="40">2</text>
+
+    <circle cx="390" cy="40" r="20" />
+    <text x="390" y="40">3</text>
+
+    <circle cx="510" cy="40" r="20" />
+    <text x="510" y="40">4</text>
+</svg>
+
+### Grid Graph
+
+An undirected graph connecting vertices in a regular tiling in one or more dimensions.
+Each dimension is configured separately. When the dimension size is at least three the
+endpoints are optionally connected by setting `wrapEndpoints`. Changing the following
+example to `addDimension(4, true)` would connect `0` to `3` and `4` to `7`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+boolean wrapEndpoints = false;
+
+Graph<LongValue,NullValue,NullValue> graph = new GridGraph(env)
+    .addDimension(2, wrapEndpoints)
+    .addDimension(4, wrapEndpoints)
+    .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.GridGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val wrapEndpoints = false
+
+val graph = new GridGraph(env.getJavaEnv).addDimension(2, wrapEndpoints).addDimension(4, wrapEndpoints).generate()
+{% endhighlight %}
+</div>
+</div>
+
+<svg class="graph" width="540" height="200"
+    xmlns="http://www.w3.org/2000/svg"
+    xmlns:xlink="http://www.w3.org/1999/xlink">
+
+    <line x1="30" y1="40" x2="510" y2="40" />
+    <line x1="30" y1="160" x2="510" y2="160" />
+
+    <line x1="30" y1="40" x2="30" y2="160" />
+    <line x1="190" y1="40" x2="190" y2="160" />
+    <line x1="350" y1="40" x2="350" y2="160" />
+    <line x1="510" y1="40" x2="510" y2="160" />
+
+    <circle cx="30" cy="40" r="20" />
+    <text x="30" y="40">0</text>
+
+    <circle cx="190" cy="40" r="20" />
+    <text x="190" y="40">1</text>
+
+    <circle cx="350" cy="40" r="20" />
+    <text x="350" y="40">2</text>
+
+    <circle cx="510" cy="40" r="20" />
+    <text x="510" y="40">3</text>
+
+    <circle cx="30" cy="160" r="20" />
+    <text x="30" y="160">4</text>
+
+    <circle cx="190" cy="160" r="20" />
+    <text x="190" y="160">5</text>
+
+    <circle cx="350" cy="160" r="20" />
+    <text x="350" y="160">6</text>
+
+    <circle cx="510" cy="160" r="20" />
+    <text x="510" y="160">7</text>
+</svg>
+
+### Hypercube Graph
+
+An undirected graph where edges form an n-dimensional hypercube. Each vertex
+in a hypercube connects to one other vertex in each dimension.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+long dimensions = 3;
+
+Graph<LongValue,NullValue,NullValue> graph = new HypercubeGraph(env, dimensions)
+    .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.HypercubeGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val dimensions = 3
+
+val graph = new HypercubeGraph(env.getJavaEnv, dimensions).generate()
+{% endhighlight %}
+</div>
+</div>
+
+<svg class="graph" width="540" height="320"
+    xmlns="http://www.w3.org/2000/svg"
+    xmlns:xlink="http://www.w3.org/1999/xlink">
+
+    <line x1="190" y1="120" x2="350" y2="120" />
+    <line x1="190" y1="200" x2="350" y2="200" />
+    <line x1="190" y1="120" x2="190" y2="200" />
+    <line x1="350" y1="120" x2="350" y2="200" />
+
+    <line x1="30" y1="40" x2="510" y2="40" />
+    <line x1="30" y1="280" x2="510" y2="280" />
+    <line x1="30" y1="40" x2="30" y2="280" />
+    <line x1="510" y1="40" x2="510" y2="280" />
+
+    <line x1="190" y1="120" x2="30" y2="40" />
+    <line x1="350" y1="120" x2="510" y2="40" />
+    <line x1="190" y1="200" x2="30" y2="280" />
+    <line x1="350" y1="200" x2="510" y2="280" />
+
+    <circle cx="190" cy="120" r="20" />
+    <text x="190" y="120">0</text>
+
+    <circle cx="350" cy="120" r="20" />
+    <text x="350" y="120">1</text>
+
+    <circle cx="190" cy="200" r="20" />
+    <text x="190" y="200">2</text>
+
+    <circle cx="350" cy="200" r="20" />
+    <text x="350" y="200">3</text>
+
+    <circle cx="30" cy="40" r="20" />
+    <text x="30" y="40">4</text>
+
+    <circle cx="510" cy="40" r="20" />
+    <text x="510" y="40">5</text>
+
+    <circle cx="30" cy="280" r="20" />
+    <text x="30" y="280">6</text>
+
+    <circle cx="510" cy="280" r="20" />
+    <text x="510" y="280">7</text>
+</svg>
+
+### Path Graph
+
+An undirected Graph where all edges form a single path.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+long vertexCount = 5
+
+Graph<LongValue,NullValue,NullValue> graph = new PathGraph(env, vertexCount)
+    .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.PathGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexCount = 5
+
+val graph = new PathGraph(env.getJavaEnv, vertexCount).generate()
+{% endhighlight %}
+</div>
+</div>
+
+<svg class="graph" width="540" height="80"
+    xmlns="http://www.w3.org/2000/svg"
+    xmlns:xlink="http://www.w3.org/1999/xlink">
+
+    <line x1="30" y1="40" x2="510" y2="40" />
+
+    <circle cx="30" cy="40" r="20" />
+    <text x="30" y="40">0</text>
+
+    <circle cx="150" cy="40" r="20" />
+    <text x="150" y="40">1</text>
+
+    <circle cx="270" cy="40" r="20" />
+    <text x="270" y="40">2</text>
+
+    <circle cx="390" cy="40" r="20" />
+    <text x="390" y="40">3</text>
+
+    <circle cx="510" cy="40" r="20" />
+    <text x="510" y="40">4</text>
+</svg>
+
+### RMat Graph
+
+A directed or undirected power-law graph generated using the
+[Recursive Matrix (R-Mat)](http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf) model.
+
+RMat is a stochastic generator configured with a source of randomness implementing the
+`RandomGenerableFactory` interface. Provided implemenations are `JDKRandomGeneratorFactory`
+and `MersenneTwisterFactory`. These generate an initial sequence of random values which are
+then used as seeds for generating the edges.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+int vertexCount = 1 << scale;
+int edgeCount = edgeFactor * vertexCount;
+
+Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+    .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.RMatGraph
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexCount = 1 << scale
+val edgeCount = edgeFactor * vertexCount
+
+val graph = new RMatGraph(env.getJavaEnv, rnd, vertexCount, edgeCount).generate()
+{% endhighlight %}
+</div>
+</div>
+
+The default RMat contants can be overridden as shown in the following example.
+The contants define the interdependence of bits from each generated edge's source
+and target labels. The RMat noise can be enabled and progressively perturbs the
+contants while generating each edge.
+
+The RMat generator can be configured to produce a simple graph by removing self-loops
+and duplicate edges. Symmetrization is performed either by a "clip-and-flip" throwing away
+the half matrix above the diagonal or a full "flip" preserving and mirroring all edges.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+int vertexCount = 1 << scale;
+int edgeCount = edgeFactor * vertexCount;
+
+boolean clipAndFlip = false;
+
+Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+    .setConstants(0.57f, 0.19f, 0.19f)
+    .setNoise(true, 0.10f)
+    .setSimpleGraph(true, clipAndFlip)
+    .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.RMatGraph
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexCount = 1 << scale
+val edgeCount = edgeFactor * vertexCount
+
+clipAndFlip = false
+
+val graph = new RMatGraph(env.getJavaEnv, rnd, vertexCount, edgeCount).setConstants(0.57f, 0.19f, 0.19f).setNoise(true, 0.10f).setSimpleGraph(true, clipAndFlip).generate()
+{% endhighlight %}
+</div>
+</div>
+
+### Singleton Edge Graph
+
+An undirected graph containing isolated two-paths.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+long vertexPairCount = 4
+
+// note: configured with the number of vertex pairs
+Graph<LongValue,NullValue,NullValue> graph = new SingletonEdgeGraph(env, vertexPairCount)
+    .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.SingletonEdgeGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexPairCount = 4
+
+// note: configured with the number of vertex pairs
+val graph = new SingletonEdgeGraph(env.getJavaEnv, vertexPairCount).generate()
+{% endhighlight %}
+</div>
+</div>
+
+<svg class="graph" width="540" height="200"
+    xmlns="http://www.w3.org/2000/svg"
+    xmlns:xlink="http://www.w3.org/1999/xlink">
+
+    <line x1="30" y1="40" x2="190" y2="40" />
+    <line x1="350" y1="40" x2="510" y2="40" />
+    <line x1="30" y1="160" x2="190" y2="160" />
+    <line x1="350" y1="160" x2="510" y2="160" />
+
+    <circle cx="30" cy="40" r="20" />
+    <text x="30" y="40">0</text>
+
+    <circle cx="190" cy="40" r="20" />
+    <text x="190" y="40">1</text>
+
+    <circle cx="350" cy="40" r="20" />
+    <text x="350" y="40">2</text>
+
+    <circle cx="510" cy="40" r="20" />
+    <text x="510" y="40">3</text>
+
+    <circle cx="30" cy="160" r="20" />
+    <text x="30" y="160">4</text>
+
+    <circle cx="190" cy="160" r="20" />
+    <text x="190" y="160">5</text>
+
+    <circle cx="350" cy="160" r="20" />
+    <text x="350" y="160">6</text>
+
+    <circle cx="510" cy="160" r="20" />
+    <text x="510" y="160">7</text>
+</svg>
+
+### Star Graph
+
+An undirected graph containing a single central vertex connected to all other leaf vertices.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+long vertexCount = 6;
+
+Graph<LongValue,NullValue,NullValue> graph = new StarGraph(env, vertexCount)
+    .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.StarGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexCount = 6
+
+val graph = new StarGraph(env.getJavaEnv, vertexCount).generate()
+{% endhighlight %}
+</div>
+</div>
+
+<svg class="graph" width="540" height="540"
+    xmlns="http://www.w3.org/2000/svg"
+    xmlns:xlink="http://www.w3.org/1999/xlink">
+
+    <line x1="270" y1="270" x2="270" y2="40" />
+    <line x1="270" y1="270" x2="489" y2="199" />
+    <line x1="270" y1="270" x2="405" y2="456" />
+    <line x1="270" y1="270" x2="135" y2="456" />
+    <line x1="270" y1="270" x2="51" y2="199" />
+
+    <circle cx="270" cy="270" r="20" />
+    <text x="270" y="270">0</text>
+
+    <circle cx="270" cy="40" r="20" />
+    <text x="270" y="40">1</text>
+
+    <circle cx="489" cy="199" r="20" />
+    <text x="489" y="199">2</text>
+
+    <circle cx="405" cy="456" r="20" />
+    <text x="405" y="456">3</text>
+
+    <circle cx="135" cy="456" r="20" />
+    <text x="135" y="456">4</text>
+
+    <circle cx="51" cy="199" r="20" />
+    <text x="51" y="199">5</text>
+</svg>
+
+{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/docs/page/css/flink.css
----------------------------------------------------------------------
diff --git a/docs/page/css/flink.css b/docs/page/css/flink.css
index 9d259a1..1b4e4c4 100644
--- a/docs/page/css/flink.css
+++ b/docs/page/css/flink.css
@@ -245,3 +245,24 @@ img.offset {
 *:hover > .anchorjs-link {
     transition: color .25s linear;
 }
+
+/*=============================================================================
+                                 Gelly Graphs
+=============================================================================*/
+
+svg.graph line {
+    stroke: rgb(255,0,0);
+    stroke-width: 4;
+}
+
+svg.graph circle {
+    fill: red;
+    stroke: black;
+    stroke-width: 3;
+}
+
+svg.graph text {
+    dominant-baseline: central;
+    font-size: 32px;
+    text-anchor: middle;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-core/src/main/java/org/apache/flink/util/LongValueSequenceIterator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/LongValueSequenceIterator.java b/flink-core/src/main/java/org/apache/flink/util/LongValueSequenceIterator.java
new file mode 100644
index 0000000..86a8ce6
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/LongValueSequenceIterator.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.types.LongValue;
+
+import java.util.NoSuchElementException;
+
+/**
+ * The {@code LongValueSequenceIterator} is an iterator that returns a sequence of numbers (as {@code LongValue})s.
+ * The iterator is splittable (as defined by {@link SplittableIterator}, i.e., it can be divided into multiple
+ * iterators that each return a subsequence of the number sequence.
+ */
+@Public
+public class LongValueSequenceIterator extends SplittableIterator<LongValue> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The last number returned by the iterator */
+	private final long to;
+
+	/** The next number to be returned */
+	private long current;
+
+	/** The next value to be returned */
+	private LongValue currentValue = new LongValue();
+
+	/**
+	 * Creates a new splittable iterator, returning the range [from, to].
+	 * Both boundaries of the interval are inclusive.
+	 *
+	 * @param from The first number returned by the iterator.
+	 * @param to The last number returned by the iterator.
+	 */
+	public LongValueSequenceIterator(long from, long to) {
+		if (from > to) {
+			throw new IllegalArgumentException("The 'to' value must not be smaller than the 'from' value.");
+		}
+
+		this.current = from;
+		this.to = to;
+	}
+
+
+	/**
+	 * Internal constructor to allow for empty iterators.
+	 *
+	 * @param from The first number returned by the iterator.
+	 * @param to The last number returned by the iterator.
+	 * @param unused A dummy parameter to disambiguate the constructor.
+	 */
+	private LongValueSequenceIterator(long from, long to, boolean unused) {
+		this.current = from;
+		this.to = to;
+	}
+
+	public long getCurrent() {
+		return this.current;
+	}
+
+	public long getTo() {
+		return this.to;
+	}
+
+	@Override
+	public boolean hasNext() {
+		return current <= to;
+	}
+
+	@Override
+	public LongValue next() {
+		if (current <= to) {
+			currentValue.setValue(current++);
+			return currentValue;
+		} else {
+			throw new NoSuchElementException();
+		}
+	}
+
+	@Override
+	public void remove() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public LongValueSequenceIterator[] split(int numPartitions) {
+		if (numPartitions < 1) {
+			throw new IllegalArgumentException("The number of partitions must be at least 1.");
+		}
+
+		if (numPartitions == 1) {
+			return new LongValueSequenceIterator[] { new LongValueSequenceIterator(current, to) };
+		}
+
+		// here, numPartitions >= 2 !!!
+
+		long elementsPerSplit;
+
+		if (to - current + 1 >= 0) {
+			elementsPerSplit = (to - current + 1) / numPartitions;
+		}
+		else {
+			// long overflow of the range.
+			// we compute based on half the distance, to prevent the overflow.
+			// in most cases it holds that: current < 0 and to > 0, except for: to == 0 and current == Long.MIN_VALUE
+			// the later needs a special case
+			final long halfDiff; // must be positive
+
+			if (current == Long.MIN_VALUE) {
+				// this means to >= 0
+				halfDiff = (Long.MAX_VALUE/2+1) + to/2;
+			} else {
+				long posFrom = -current;
+				if (posFrom > to) {
+					halfDiff = to + ((posFrom - to) / 2);
+				} else {
+					halfDiff = posFrom + ((to - posFrom) / 2);
+				}
+			}
+			elementsPerSplit = halfDiff / numPartitions * 2;
+		}
+
+		if (elementsPerSplit < Long.MAX_VALUE) {
+			// figure out how many get one in addition
+			long numWithExtra = -(elementsPerSplit * numPartitions) + to - current + 1;
+
+			// based on rounding errors, we may have lost one)
+			if (numWithExtra > numPartitions) {
+				elementsPerSplit++;
+				numWithExtra -= numPartitions;
+
+				if (numWithExtra > numPartitions) {
+					throw new RuntimeException("Bug in splitting logic. To much rounding loss.");
+				}
+			}
+
+			LongValueSequenceIterator[] iters = new LongValueSequenceIterator[numPartitions];
+			long curr = current;
+			int i = 0;
+			for (; i < numWithExtra; i++) {
+				long next = curr + elementsPerSplit + 1;
+				iters[i] = new LongValueSequenceIterator(curr, next-1);
+				curr = next;
+			}
+			for (; i < numPartitions; i++) {
+				long next = curr + elementsPerSplit;
+				iters[i] = new LongValueSequenceIterator(curr, next-1, true);
+				curr = next;
+			}
+
+			return iters;
+		}
+		else {
+			// this can only be the case when there are two partitions
+			if (numPartitions != 2) {
+				throw new RuntimeException("Bug in splitting logic.");
+			}
+
+			return new LongValueSequenceIterator[] {
+				new LongValueSequenceIterator(current, current + elementsPerSplit),
+				new LongValueSequenceIterator(current + elementsPerSplit, to)
+			};
+		}
+	}
+
+
+	@Override
+	public int getMaximumNumberOfSplits() {
+		if (to >= Integer.MAX_VALUE || current <= Integer.MIN_VALUE || to-current+1 >= Integer.MAX_VALUE) {
+			return Integer.MAX_VALUE;
+		}
+		else {
+			return (int) (to-current+1);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-core/src/test/java/org/apache/flink/util/LongValueSequenceIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/LongValueSequenceIteratorTest.java b/flink-core/src/test/java/org/apache/flink/util/LongValueSequenceIteratorTest.java
new file mode 100644
index 0000000..3407690
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/LongValueSequenceIteratorTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+
+public class LongValueSequenceIteratorTest extends TestLogger {
+
+	@Test
+	public void testSplitRegular() {
+		testSplitting(new org.apache.flink.util.LongValueSequenceIterator(0, 10), 2);
+		testSplitting(new org.apache.flink.util.LongValueSequenceIterator(100, 100000), 7);
+		testSplitting(new org.apache.flink.util.LongValueSequenceIterator(-100, 0), 5);
+		testSplitting(new org.apache.flink.util.LongValueSequenceIterator(-100, 100), 3);
+	}
+
+	@Test
+	public void testSplittingLargeRangesBy2() {
+		testSplitting(new org.apache.flink.util.LongValueSequenceIterator(0, Long.MAX_VALUE), 2);
+		testSplitting(new org.apache.flink.util.LongValueSequenceIterator(-1000000000L, Long.MAX_VALUE), 2);
+		testSplitting(new org.apache.flink.util.LongValueSequenceIterator(Long.MIN_VALUE, Long.MAX_VALUE), 2);
+	}
+
+	@Test
+	public void testSplittingTooSmallRanges() {
+		testSplitting(new org.apache.flink.util.LongValueSequenceIterator(0, 0), 2);
+		testSplitting(new org.apache.flink.util.LongValueSequenceIterator(-5, -5), 2);
+		testSplitting(new org.apache.flink.util.LongValueSequenceIterator(-5, -4), 3);
+		testSplitting(new org.apache.flink.util.LongValueSequenceIterator(10, 15), 10);
+	}
+
+	private static final void testSplitting(org.apache.flink.util.LongValueSequenceIterator iter, int numSplits) {
+		org.apache.flink.util.LongValueSequenceIterator[] splits = iter.split(numSplits);
+
+		assertEquals(numSplits, splits.length);
+
+		// test start and end of range
+		assertEquals(iter.getCurrent(), splits[0].getCurrent());
+		assertEquals(iter.getTo(), splits[numSplits-1].getTo());
+
+		// test continuous range
+		for (int i = 1; i < splits.length; i++) {
+			assertEquals(splits[i-1].getTo() + 1, splits[i].getCurrent());
+		}
+
+		testMaxSplitDiff(splits);
+	}
+
+
+	private static final void testMaxSplitDiff(org.apache.flink.util.LongValueSequenceIterator[] iters) {
+		long minSplitSize = Long.MAX_VALUE;
+		long maxSplitSize = Long.MIN_VALUE;
+
+		for (LongValueSequenceIterator iter : iters) {
+			long diff;
+			if (iter.getTo() < iter.getCurrent()) {
+				diff = 0;
+			} else {
+				diff = iter.getTo() - iter.getCurrent();
+			}
+			if (diff < 0) {
+				diff = Long.MAX_VALUE;
+			}
+
+			minSplitSize = Math.min(minSplitSize, diff);
+			maxSplitSize = Math.max(maxSplitSize, diff);
+		}
+
+		assertTrue(maxSplitSize == minSplitSize || maxSplitSize-1 == minSplitSize);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index f284b90..89b6d17 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -209,7 +209,7 @@ public final class Utils {
 
 		@Override
 		public String toString() {
-			return "ChecksumHashCode " + this.checksum + ", count " + this.count;
+			return String.format("ChecksumHashCode 0x%016x, count %d", this.checksum, this.count);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
new file mode 100644
index 0000000..b9d6fbd
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.LongValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Generate an RMat graph for Graph 500.
+ *
+ * Note that this does not yet implement permutation of vertex labels or edges.
+ *
+ * @see <a href="http://www.graph500.org/specifications">Graph 500</a>
+ */
+public class Graph500 {
+
+	public static final int DEFAULT_SCALE = 10;
+
+	public static final int DEFAULT_EDGE_FACTOR = 16;
+
+	public static final boolean DEFAULT_SIMPLIFY = false;
+
+	public static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+	public static void main(String[] args) throws Exception {
+		// Set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+
+		ParameterTool parameters = ParameterTool.fromArgs(args);
+
+		// Generate RMat graph
+		int scale = parameters.getInt("scale", DEFAULT_SCALE);
+		int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
+
+		RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+		long vertexCount = 1 << scale;
+		long edgeCount = vertexCount * edgeFactor;
+
+		boolean simplify = parameters.getBoolean("simplify", DEFAULT_SIMPLIFY);
+		boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+
+		DataSet<Tuple2<LongValue,LongValue>> edges = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+			.setSimpleGraph(simplify, clipAndFlip)
+			.generate()
+			.getEdges()
+			.project(0, 1);
+
+		// Print, hash, or write RMat graph to disk
+		switch (parameters.get("output", "")) {
+		case "print":
+			edges.print();
+			break;
+
+		case "hash":
+			System.out.println(DataSetUtils.checksumHashCode(edges));
+			break;
+
+		case "csv":
+			String filename = parameters.get("filename");
+
+			String row_delimiter = parameters.get("row_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER);
+			String field_delimiter = parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
+
+			edges.writeAsCsv(filename, row_delimiter, field_delimiter);
+
+			env.execute();
+			break;
+		default:
+			System.out.println("A Graph500 generator using the Recursive Matrix (RMat) graph generator.");
+			System.out.println();
+			System.out.println("The graph matrix contains 2^scale vertices although not every vertex will");
+			System.out.println("be represented in an edge. The number of edges is edge_factor * 2^scale edges");
+			System.out.println("although some edges may be duplicates.");
+			System.out.println();
+			System.out.println("Note: this does not yet implement permutation of vertex labels or edges.");
+			System.out.println();
+			System.out.println("usage:");
+			System.out.println("  Graph500 [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print");
+			System.out.println("  Graph500 [--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash");
+			System.out.println("  Graph500 [--scale SCALE] [--edge_factor EDGE_FACTOR] --output csv" +
+					" --filename FILENAME [--row_delimiter ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]");
+
+			return;
+		}
+
+		JobExecutionResult result = env.getLastJobExecutionResult();
+
+		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
new file mode 100644
index 0000000..2eee6d7
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+public abstract class AbstractGraphGenerator<K, VV, EV>
+implements GraphGenerator<K, VV, EV> {
+
+	// Optional configuration
+	protected int parallelism = -1;
+
+	@Override
+	public GraphGenerator<K,VV,EV> setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+
+		return this;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
new file mode 100644
index 0000000..5339b14
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.LongValueSequenceIterator;
+
+/*
+ * @see <a href="http://mathworld.wolfram.com/CompleteGraph.html">Complete Graph at Wolfram MathWorld</a>
+ */
+public class CompleteGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+	// Required to create the DataSource
+	private final ExecutionEnvironment env;
+
+	// Required configuration
+	private long vertexCount;
+
+	/**
+	 * An undirected {@link Graph} connecting every distinct pair of vertices.
+	 *
+	 * @param env the Flink execution environment
+	 * @param vertexCount number of vertices
+	 */
+	public CompleteGraph(ExecutionEnvironment env, long vertexCount) {
+		if (vertexCount <= 0) {
+			throw new IllegalArgumentException("Vertex count must be greater than zero");
+		}
+
+		this.env = env;
+		this.vertexCount = vertexCount;
+	}
+
+	@Override
+	public Graph<LongValue,NullValue,NullValue> generate() {
+		// Vertices
+		DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+
+		// Edges
+		LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount - 1);
+
+		DataSet<Edge<LongValue,NullValue>> edges = env
+			.fromParallelCollection(iterator, LongValue.class)
+				.setParallelism(parallelism)
+				.name("Edge iterators")
+			.flatMap(new LinkVertexToAll(vertexCount))
+				.setParallelism(parallelism)
+				.name("Complete graph edges");
+
+		// Graph
+		return Graph.fromDataSet(vertices, edges, env);
+	}
+
+	@ForwardedFields("*->f0")
+	public class LinkVertexToAll
+	implements FlatMapFunction<LongValue, Edge<LongValue,NullValue>> {
+
+		private final long vertexCount;
+
+		private LongValue target = new LongValue();
+
+		private Edge<LongValue,NullValue> edge = new Edge<>(null, target, NullValue.getInstance());
+
+		public LinkVertexToAll(long vertex_count) {
+			this.vertexCount = vertex_count;
+		}
+
+		@Override
+		public void flatMap(LongValue source, Collector<Edge<LongValue,NullValue>> out)
+				throws Exception {
+			edge.f0 = source;
+
+			long s = source.getValue();
+			long t = (s + 1) % vertexCount;
+
+			while (s != t) {
+				target.setValue(t);
+				out.collect(edge);
+
+				if (++t == vertexCount) {
+					t = 0;
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
new file mode 100644
index 0000000..2671efe
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+/*
+ * @see <a href="http://mathworld.wolfram.com/CycleGraph.html">Cycle Graph at Wolfram MathWorld</a>
+ */
+public class CycleGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+	// Required to create the DataSource
+	private final ExecutionEnvironment env;
+
+	// Required configuration
+	private long vertexCount;
+
+	/**
+	 * An undirected {@link Graph} where all edges form a single cycle.
+	 *
+	 * @param env the Flink execution environment
+	 * @param vertexCount number of vertices
+	 */
+	public CycleGraph(ExecutionEnvironment env, long vertexCount) {
+		if (vertexCount <= 0) {
+			throw new IllegalArgumentException("Vertex count must be greater than zero");
+		}
+
+		this.env = env;
+		this.vertexCount = vertexCount;
+	}
+
+	@Override
+	public Graph<LongValue,NullValue,NullValue> generate() {
+		return new GridGraph(env)
+			.addDimension(vertexCount, true)
+			.setParallelism(parallelism)
+			.generate();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
new file mode 100644
index 0000000..05bfd89
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.util.Collections;
+
+/*
+ * @see <a href="http://mathworld.wolfram.com/EmptyGraph.html">Empty Graph at Wolfram MathWorld</a>
+ */
+public class EmptyGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+	// Required to create the DataSource
+	private final ExecutionEnvironment env;
+
+	// Required configuration
+	private long vertexCount;
+
+	/**
+	 * The {@link Graph} containing no edges.
+	 *
+	 * @param env the Flink execution environment
+	 * @param vertexCount number of vertices
+	 */
+	public EmptyGraph(ExecutionEnvironment env, long vertexCount) {
+		if (vertexCount <= 0) {
+			throw new IllegalArgumentException("Vertex count must be greater than zero");
+		}
+
+		this.env = env;
+		this.vertexCount = vertexCount;
+	}
+
+	@Override
+	public Graph<LongValue,NullValue,NullValue> generate() {
+		// Vertices
+		DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+
+		// Edges
+		TypeInformation<Edge<LongValue,NullValue>> typeInformation = new TupleTypeInfo<>(
+			ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.NULL_VALUE_TYPE_INFO);
+
+		DataSource<Edge<LongValue,NullValue>> edges = env
+			.fromCollection(Collections.<Edge<LongValue,NullValue>>emptyList(), typeInformation)
+				.setParallelism(parallelism)
+				.name("Empty edge set");
+
+		// Graph
+		return Graph.fromDataSet(vertices, edges, env);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java
new file mode 100644
index 0000000..8fece81
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.graph.Graph;
+
+/**
+ * Graph generators shall be
+ * - parallelizable, in order to create large datasets
+ * - scale-free, generating the same graph regardless of parallelism
+ * - thrifty, using as few operators as possible
+ *
+ * Graph generators should prefer to emit edges sorted by the source label.
+ *
+ * @param <K> the key type for edge and vertex identifiers
+ * @param <VV> the value type for vertices
+ * @param <EV> the value type for edges
+ */
+public interface GraphGenerator<K, VV, EV> {
+
+	/**
+	 * Generates the configured graph.
+	 *
+	 * @return generated graph
+	 */
+	Graph<K,VV,EV> generate();
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	GraphGenerator<K,VV,EV> setParallelism(int parallelism);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
new file mode 100644
index 0000000..a7b5ce9
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.LongValueSequenceIterator;
+
+public class GraphGeneratorUtils {
+
+	/**
+	 * Generates {@link Vertex Vertices} with sequential, numerical labels.
+	 *
+	 * @param env the Flink execution environment.
+	 * @param parallelism operator parallelism
+	 * @param vertexCount number of sequential vertex labels
+	 * @return {@link DataSet} of sequentially labeled {@link Vertex Vertices}
+	 */
+	public static DataSet<Vertex<LongValue,NullValue>> vertexSequence(ExecutionEnvironment env, int parallelism, long vertexCount) {
+		LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount-1);
+
+		DataSource<LongValue> vertexLabels = env
+			.fromParallelCollection(iterator, LongValue.class)
+				.setParallelism(parallelism)
+				.name("Vertex iterators");
+
+		DataSet<Vertex<LongValue,NullValue>> vertexSequence = vertexLabels
+			.map(new CreateVertex())
+				.setParallelism(parallelism)
+				.name("Vertex sequence");
+
+		return vertexSequence;
+	}
+
+	@ForwardedFields("*->f0")
+	private static class CreateVertex
+	implements MapFunction<LongValue, Vertex<LongValue,NullValue>> {
+
+		private Vertex<LongValue,NullValue> vertex = new Vertex<>(null, NullValue.getInstance());
+
+		@Override
+		public Vertex<LongValue, NullValue> map(LongValue value)
+				throws Exception {
+			vertex.f0 = value;
+
+			return vertex;
+		}
+	}
+
+	/**************************************************************************/
+
+	/**
+	 * Generates {@link Vertex Vertices} present in the given set of {@link Edge}s.
+	 *
+	 * @param edges source {@link DataSet} of {@link Edge}s
+	 * @param parallelism operator parallelism
+	 * @param <K> label type
+	 * @param <EV> edge value type
+	 * @return {@link DataSet} of discovered {@link Vertex Vertices}
+	 *
+	 * @see {@link Graph#fromDataSet(DataSet, DataSet, ExecutionEnvironment)}
+	 */
+	public static <K,EV> DataSet<Vertex<K,NullValue>> vertexSet(DataSet<Edge<K,EV>> edges, int parallelism) {
+		DataSet<Vertex<K,NullValue>> vertexSet = edges
+			.flatMap(new EmitSrcAndTarget<K, EV>())
+				.setParallelism(parallelism)
+				.name("Emit source and target labels");
+
+		DataSet<Vertex<K,NullValue>> distinctVertexSet = vertexSet
+			.distinct()
+				.setParallelism(parallelism)
+				.name("Emit vertex labels");
+
+		return distinctVertexSet;
+	}
+
+	/**
+	 * @see {@link Graph.EmitSrcAndTarget}
+	 */
+	private static final class EmitSrcAndTarget<K,EV>
+	implements FlatMapFunction<Edge<K,EV>, Vertex<K,NullValue>> {
+
+		private Vertex<K,NullValue> output = new Vertex<>(null, new NullValue());
+
+		@Override
+		public void flatMap(Edge<K,EV> value, Collector<Vertex<K,NullValue>> out) throws Exception {
+			output.f0 = value.f0;
+			out.collect(output);
+			output.f0 = value.f1;
+			out.collect(output);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
new file mode 100644
index 0000000..399a2f9
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.LongValueSequenceIterator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ * @see <a href="http://mathworld.wolfram.com/GridGraph.html">Grid Graph at Wolfram MathWorld</a>
+ */
+public class GridGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+	// Required to create the DataSource
+	private final ExecutionEnvironment env;
+
+	// Required configuration
+	private List<Tuple2<Long,Boolean>> dimensions = new ArrayList<>();
+
+	private long vertexCount = 1;
+
+	/**
+	 * An undirected {@link Graph} connecting vertices in a regular tiling in one or more dimensions.
+	 *
+	 * @param env the Flink execution environment
+	 */
+	public GridGraph(ExecutionEnvironment env) {
+		this.env = env;
+	}
+
+	/**
+	 * Required configuration for each dimension of the graph.
+	 *
+	 * @param size number of vertices; dimensions of size 1 are prohibited due to having no effect
+	 *             on the generated graph
+	 * @param wrapEndpoints whether to connect first and last vertices; this has no effect on
+	 *                      dimensions of size 2
+	 * @return this
+	 */
+	public GridGraph addDimension(long size, boolean wrapEndpoints) {
+		if (size <= 1) {
+			throw new IllegalArgumentException("Dimension size must be greater than 1");
+		}
+
+		vertexCount *= size;
+
+		// prevent duplicate edges
+		if (size == 2) {
+			wrapEndpoints = false;
+		}
+
+		dimensions.add(new Tuple2<>(size, wrapEndpoints));
+
+		return this;
+	}
+
+	@Override
+	public Graph<LongValue,NullValue,NullValue> generate() {
+		if (dimensions.isEmpty()) {
+			throw new RuntimeException("No dimensions added to GridGraph");
+		}
+
+		// Vertices
+		DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+
+		// Edges
+		LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount - 1);
+
+		DataSet<Edge<LongValue,NullValue>> edges = env
+			.fromParallelCollection(iterator, LongValue.class)
+				.setParallelism(parallelism)
+				.name("Edge iterators")
+			.flatMap(new LinkVertexToNeighbors(vertexCount, dimensions))
+				.setParallelism(parallelism)
+				.name("Grid graph edges");
+
+		// Graph
+		return Graph.fromDataSet(vertices, edges, env);
+	}
+
+	@ForwardedFields("*->f0")
+	public class LinkVertexToNeighbors
+	implements FlatMapFunction<LongValue, Edge<LongValue,NullValue>> {
+
+		private long vertexCount;
+
+		private List<Tuple2<Long,Boolean>> dimensions;
+
+		private LongValue target = new LongValue();
+
+		private Edge<LongValue,NullValue> edge = new Edge<>(null, target, NullValue.getInstance());
+
+		public LinkVertexToNeighbors(long vertexCount, List<Tuple2<Long,Boolean>> dimensions) {
+			this.vertexCount = vertexCount;
+			this.dimensions = dimensions;
+		}
+
+		@Override
+		public void flatMap(LongValue source, Collector<Edge<LongValue,NullValue>> out)
+				throws Exception {
+			edge.f0 = source;
+			long val = source.getValue();
+
+			// the distance between neighbors in a given iteration
+			long increment = vertexCount;
+
+			// the value in the remaining dimensions
+			long remainder = val;
+
+			for (Tuple2<Long,Boolean> dimension : dimensions) {
+				increment /= dimension.f0;
+
+				// the index within this dimension
+				long index = remainder / increment;
+
+				if (index > 0) {
+					target.setValue(val - increment);
+					out.collect(edge);
+				} else if (dimension.f1) {
+					target.setValue(val + increment * (dimension.f0 - 1));
+					out.collect(edge);
+				}
+
+				if (index < dimension.f0 - 1) {
+					target.setValue(val + increment);
+					out.collect(edge);
+				} else if (dimension.f1) {
+					target.setValue(val - increment * (dimension.f0 - 1));
+					out.collect(edge);
+				}
+
+				remainder %= increment;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
new file mode 100644
index 0000000..40968a0
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+/*
+ * @see <a href="http://mathworld.wolfram.com/HypercubeGraph.html">Hypercube Graph at Wolfram MathWorld</a>
+ */
+public class HypercubeGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+	// Required to create the DataSource
+	private final ExecutionEnvironment env;
+
+	// Required configuration
+	private long dimensions;
+
+	/**
+	 * An undirected {@link Graph} where edges form an n-dimensional hypercube.
+	 *
+	 * @param env the Flink execution environment
+	 * @param dimensions number of dimensions
+	 */
+	public HypercubeGraph(ExecutionEnvironment env, long dimensions) {
+		if (dimensions <= 0) {
+			throw new IllegalArgumentException("Number of dimensions must be greater than zero");
+		}
+
+		this.env = env;
+		this.dimensions = dimensions;
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> generate() {
+		GridGraph graph = new GridGraph(env);
+
+		for (int i = 0; i < dimensions; i++) {
+			graph.addDimension(2, false);
+		}
+
+		return graph
+			.setParallelism(parallelism)
+			.generate();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
new file mode 100644
index 0000000..1aec723
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+/*
+ * @see <a href="http://mathworld.wolfram.com/PathGraph.html">Path Graph at Wolfram MathWorld</a>
+ */
+public class PathGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+	// Required to create the DataSource
+	private final ExecutionEnvironment env;
+
+	// Required configuration
+	private long vertexCount;
+
+	/**
+	 * An undirected {@link Graph} where all edges form a single path.
+	 *
+	 * @param env the Flink execution environment
+	 * @param vertexCount number of vertices
+	 */
+	public PathGraph(ExecutionEnvironment env, long vertexCount) {
+		if (vertexCount <= 0) {
+			throw new IllegalArgumentException("Vertex count must be greater than zero");
+		}
+
+		this.env = env;
+		this.vertexCount = vertexCount;
+	}
+
+	@Override
+	public Graph<LongValue,NullValue,NullValue> generate() {
+		return new GridGraph(env)
+			.addDimension(vertexCount, false)
+			.setParallelism(parallelism)
+			.generate();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
new file mode 100644
index 0000000..246d8bb
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.commons.math3.random.RandomGenerator;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.generator.random.BlockInfo;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+
+/*
+ * @see <a href="http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf">R-MAT: A Recursive Model for Graph Mining</a>
+ */
+public class RMatGraph<T extends RandomGenerator>
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+	// Default RMat constants
+	public static final float DEFAULT_A = 0.57f;
+
+	public static final float DEFAULT_B = 0.19f;
+
+	public static final float DEFAULT_C = 0.19f;
+
+	public static final float DEFAULT_NOISE = 0.10f;
+
+	// Required to create the DataSource
+	private ExecutionEnvironment env;
+
+	// Required configuration
+	private final RandomGenerableFactory<T> randomGenerableFactory;
+
+	private final long vertexCount;
+
+	private final long edgeCount;
+
+	// Optional configuration
+	private float A = DEFAULT_A;
+
+	private float B = DEFAULT_B;
+
+	private float C = DEFAULT_C;
+
+	private boolean noiseEnabled = false;
+
+	private float noise = DEFAULT_NOISE;
+
+	private boolean simpleGraph = false;
+
+	private boolean clipAndFlip = false;
+
+	/**
+	 * Generate a directed or undirected power-law {@link Graph} using the
+	 * Recursive Matrix (R-Mat) model.
+	 *
+	 * @param env the Flink execution environment
+	 * @param randomGeneratorFactory source of randomness
+	 * @param vertexCount number of vertices
+	 * @param edgeCount number of edges
+	 */
+	public RMatGraph(ExecutionEnvironment env, RandomGenerableFactory<T> randomGeneratorFactory, long vertexCount, long edgeCount) {
+		if (vertexCount <= 0) {
+			throw new IllegalArgumentException("Vertex count must be greater than zero");
+		}
+
+		if (edgeCount <= 0) {
+			throw new IllegalArgumentException("Edge count must be greater than zero");
+		}
+
+		this.env = env;
+		this.randomGenerableFactory = randomGeneratorFactory;
+		this.vertexCount = vertexCount;
+		this.edgeCount = edgeCount;
+	}
+
+	/**
+	 * The parameters for recursively subdividing the adjacency matrix.
+	 *
+	 * Setting A = B = C = 0.25 emulates the Erdős–Rényi model.
+	 *
+	 * Graph500 uses A = 0.57, B = C = 0.19.
+	 *
+	 * @param A likelihood of source bit = 0, target bit = 0
+	 * @param B likelihood of source bit = 0, target bit = 1
+	 * @param C likelihood of source bit = 1, target bit = 0
+	 * @return this
+	 */
+	public RMatGraph<T> setConstants(float A, float B, float C) {
+		if (A < 0.0f || B < 0.0f || C < 0.0f || A + B + C > 1.0f) {
+			throw new RuntimeException("RMat parameters A, B, and C must be non-negative and sum to less than or equal to one");
+		}
+
+		this.A = A;
+		this.B = B;
+		this.C = C;
+
+		return this;
+	}
+
+	/**
+	 * Enable and configure noise. Each edge is generated independently, but
+	 * when noise is enabled the parameters A, B, and C are randomly increased
+	 * or decreased, then normalized, by a fraction of the noise factor during
+	 * the computation of each bit.
+	 *
+	 * @param noiseEnabled whether to enable noise perturbation
+	 * @param noise strength of noise perturbation
+	 * @return this
+	 */
+	public RMatGraph<T> setNoise(boolean noiseEnabled, float noise) {
+		if (noise < 0.0f || noise > 2.0f) {
+			throw new RuntimeException("RMat parameter noise must be non-negative and less than or equal to 2.0");
+		}
+
+		this.noiseEnabled = noiseEnabled;
+		this.noise = noise;
+
+		return this;
+	}
+
+	/**
+	 * When configured for a simple graph duplicate edges and self-loops will
+	 * be removed. The clip-and-flip method removes edges where source < target
+	 * before symmetrizing the graph.
+	 *
+	 * @param simpleGraph whether to generate a simple graph
+	 * @param clipAndFlip method for generating simple graph
+	 * @return this
+	 */
+	public RMatGraph<T> setSimpleGraph(boolean simpleGraph, boolean clipAndFlip) {
+		this.simpleGraph = simpleGraph;
+		this.clipAndFlip = clipAndFlip;
+
+		return this;
+	}
+
+	@Override
+	public Graph<LongValue,NullValue,NullValue> generate() {
+		int scale = Long.SIZE - Long.numberOfLeadingZeros(vertexCount - 1);
+
+		// Edges
+		int cyclesPerEdge = noiseEnabled ? 5 * scale : scale;
+
+		List<BlockInfo<T>> generatorBlocks = randomGenerableFactory
+			.getRandomGenerables(edgeCount, cyclesPerEdge);
+
+		DataSet<Edge<LongValue,NullValue>> generatedEdges = env
+			.fromCollection(generatorBlocks)
+				.name("Random generators")
+			.rebalance()
+				.setParallelism(parallelism)
+				.name("Rebalance")
+			.flatMap(new GenerateEdges<T>(vertexCount, scale, A, B, C, noiseEnabled, noise, simpleGraph, clipAndFlip))
+				.setParallelism(parallelism)
+				.name("RMat graph edges");
+
+		DataSet<Edge<LongValue,NullValue>> edges;
+
+		if (simpleGraph) {
+			edges = generatedEdges
+				.distinct(1, 0)
+					.setParallelism(parallelism)
+					.name("Distinct");
+		} else {
+			edges = generatedEdges;
+		}
+
+		// Vertices
+		DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSet(edges, parallelism);
+
+		// Graph
+		return Graph.fromDataSet(vertices, edges, env);
+	}
+
+	private static final class GenerateEdges<T extends RandomGenerator>
+	implements FlatMapFunction<BlockInfo<T>, Edge<LongValue,NullValue>> {
+
+		// Configuration
+		private final long vertexCount;
+
+		private final int scale;
+
+		private final float A;
+
+		private final float B;
+
+		private final float C;
+
+		private final float D;
+
+		private final boolean noiseEnabled;
+
+		private final float noise;
+
+		private final boolean simpleGraph;
+
+		private final boolean clipAndFlip;
+
+		// Output
+		private LongValue source = new LongValue();
+
+		private LongValue target = new LongValue();
+
+		private Edge<LongValue,NullValue> sourceToTarget = new Edge<>(source, target, NullValue.getInstance());
+
+		private Edge<LongValue,NullValue> targetToSource = new Edge<>(target, source, NullValue.getInstance());
+
+		public GenerateEdges(long vertexCount, int scale, float A, float B, float C, boolean noiseEnabled, float noise, boolean simpleGraph, boolean clipAndFlip) {
+			this.vertexCount = vertexCount;
+			this.scale = scale;
+			this.A = A;
+			this.B = B;
+			this.C = C;
+			this.D = 1.0f - A - B - C;
+			this.noiseEnabled = noiseEnabled;
+			this.noise = noise;
+			this.simpleGraph = simpleGraph;
+			this.clipAndFlip = clipAndFlip;
+		}
+
+		@Override
+		public void flatMap(BlockInfo<T> blockInfo, Collector<Edge<LongValue,NullValue>> out)
+				throws Exception {
+			RandomGenerator rng = blockInfo.getRandomGenerable().generator();
+			long edgesToGenerate = blockInfo.getElementCount();
+
+			while (edgesToGenerate > 0) {
+				long x = 0;
+				long y = 0;
+
+				// matrix constants are reset for each edge
+				float a = A;
+				float b = B;
+				float c = C;
+				float d = D;
+
+				for (int bit = 0; bit < scale; bit++) {
+					// generated next bit for source and target
+					x <<= 1;
+					y <<= 1;
+
+					float random = rng.nextFloat();
+
+					if (random <= a) {
+					} else if (random <= a + b) {
+						y += 1;
+					} else if (random <= a + b + c) {
+						x += 1;
+					} else {
+						x += 1;
+						y += 1;
+					}
+
+					if (noiseEnabled) {
+						// noise is bounded such that all parameters remain non-negative
+						a *= 1.0 - noise / 2 + rng.nextFloat() * noise;
+						b *= 1.0 - noise / 2 + rng.nextFloat() * noise;
+						c *= 1.0 - noise / 2 + rng.nextFloat() * noise;
+						d *= 1.0 - noise / 2 + rng.nextFloat() * noise;
+
+						// normalize back to a + b + c + d = 1.0
+						float norm = 1.0f / (a + b + c + d);
+
+						a *= norm;
+						b *= norm;
+						c *= norm;
+
+						// could multiply by norm, but subtract to minimize rounding error
+						d = 1.0f - a - b - c;
+					}
+				}
+
+				// if vertexCount is not a power-of-2 then discard edges outside the vertex range
+				if (x < vertexCount && y < vertexCount) {
+					source.setValue(x);
+					target.setValue(y);
+
+					if (simpleGraph) {
+						if ((clipAndFlip && x > y) || (!clipAndFlip && x != y)) {
+							out.collect(sourceToTarget);
+							out.collect(targetToSource);
+						}
+					} else {
+						out.collect(sourceToTarget);
+					}
+
+					edgesToGenerate--;
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
new file mode 100644
index 0000000..a714a29
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.LongValueSequenceIterator;
+
+/**
+ * A singleton-edge {@link Graph} contains one or more isolated two-paths. The in- and out-degree
+ * of every vertex is 1. For {@code n} vertices there are {@code n/2} components.
+ */
+public class SingletonEdgeGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+	// Required to create the DataSource
+	private final ExecutionEnvironment env;
+
+	// Required configuration
+	private long vertexPairCount;
+
+	/**
+	 * An undirected {@link Graph} containing isolated two-paths.
+	 *
+	 * @param env the Flink execution environment
+	 * @param vertexPairCount number of pairs of vertices
+	 */
+	public SingletonEdgeGraph(ExecutionEnvironment env, long vertexPairCount) {
+		if (vertexPairCount <= 0) {
+			throw new IllegalArgumentException("Vertex pair count must be greater than zero");
+		}
+
+		this.env = env;
+		this.vertexPairCount = vertexPairCount;
+	}
+
+	@Override
+	public Graph<LongValue,NullValue,NullValue> generate() {
+		// Vertices
+		long vertexCount = 2 * this.vertexPairCount;
+
+		DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+
+		// Edges
+		LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount - 1);
+
+		DataSet<Edge<LongValue,NullValue>> edges = env
+			.fromParallelCollection(iterator, LongValue.class)
+				.setParallelism(parallelism)
+				.name("Edge iterators")
+			.map(new LinkVertexToSingletonNeighbor())
+				.setParallelism(parallelism)
+				.name("Complete graph edges");
+
+		// Graph
+		return Graph.fromDataSet(vertices, edges, env);
+	}
+
+	@ForwardedFields("*->f0")
+	private class LinkVertexToSingletonNeighbor
+	implements MapFunction<LongValue, Edge<LongValue,NullValue>> {
+
+		private LongValue source = new LongValue();
+
+		private LongValue target = new LongValue();
+
+		private Edge<LongValue,NullValue> edge = new Edge<>(source, target, NullValue.getInstance());
+
+		@Override
+		public Edge<LongValue,NullValue> map(LongValue value) throws Exception {
+			long val = value.getValue();
+
+			source.setValue(val);
+
+			if (val % 2 == 0) {
+				target.setValue(val + 1);
+			} else {
+				target.setValue(val - 1);
+			}
+
+			return edge;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
new file mode 100644
index 0000000..cb99f30
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.LongValueSequenceIterator;
+
+/*
+ * @see <a href="http://mathworld.wolfram.com/StarGraph.html">Star Graph at Wolfram MathWorld</a>
+ */
+public class StarGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+	// Required to create the DataSource
+	private final ExecutionEnvironment env;
+
+	// Required configuration
+	private long vertexCount;
+
+	/**
+	 * An undirected {@link Graph} containing a single central {@link Vertex} connected to all other leaf vertices.
+	 *
+	 * @param env the Flink execution environment
+	 * @param vertexCount number of vertices
+	 */
+	public StarGraph(ExecutionEnvironment env, long vertexCount) {
+		if (vertexCount <= 0) {
+			throw new IllegalArgumentException("Vertex count must be greater than zero");
+		}
+
+		this.env = env;
+		this.vertexCount = vertexCount;
+	}
+
+	@Override
+	public Graph<LongValue,NullValue,NullValue> generate() {
+		// Vertices
+		DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+
+		// Edges
+		LongValueSequenceIterator iterator = new LongValueSequenceIterator(1, this.vertexCount - 1);
+
+		DataSet<Edge<LongValue,NullValue>> edges = env
+			.fromParallelCollection(iterator, LongValue.class)
+				.setParallelism(parallelism)
+				.name("Edge iterators")
+			.flatMap(new LinkVertexToCenter())
+				.setParallelism(parallelism)
+				.name("Star graph edges");
+
+		// Graph
+		return Graph.fromDataSet(vertices, edges, env);
+	}
+
+	@ForwardedFields("*->f0")
+	public class LinkVertexToCenter
+	implements FlatMapFunction<LongValue, Edge<LongValue,NullValue>> {
+
+		private LongValue center = new LongValue(0);
+
+		private Edge<LongValue,NullValue> center_to_leaf = new Edge<>(center, null, NullValue.getInstance());
+
+		private Edge<LongValue,NullValue> leaf_to_center = new Edge<>(null, center, NullValue.getInstance());
+
+		@Override
+		public void flatMap(LongValue leaf, Collector<Edge<LongValue,NullValue>> out)
+				throws Exception {
+			center_to_leaf.f1 = leaf;
+			out.collect(center_to_leaf);
+
+			leaf_to_center.f0 = leaf;
+			out.collect(leaf_to_center);
+		}
+	}
+}