You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/04/13 20:29:12 UTC
[2/2] flink git commit: [FLINK-2909] [gelly] Graph Generators
[FLINK-2909] [gelly] Graph Generators
Initial set of scale-free graph generators:
- Complete graph
- Cycle graph
- Empty graph
- Grid graph
- Hypercube graph
- Path graph
- RMat graph
- Singleton edge graph
- Star graph
This closes #1807
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0a7a1b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0a7a1b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0a7a1b8
Branch: refs/heads/master
Commit: b0a7a1b813b29dbfd5c959ee2929372c31befda8
Parents: 5350bc4
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Mar 15 16:59:02 2016 -0400
Committer: EC2 Default User <ec...@ip-10-0-35-26.ec2.internal>
Committed: Wed Apr 13 18:25:29 2016 +0000
----------------------------------------------------------------------
docs/apis/batch/libs/gelly.md | 644 +++++++++++++++++++
docs/page/css/flink.css | 21 +
.../flink/util/LongValueSequenceIterator.java | 193 ++++++
.../util/LongValueSequenceIteratorTest.java | 92 +++
.../java/org/apache/flink/api/java/Utils.java | 2 +-
.../apache/flink/graph/examples/Graph500.java | 121 ++++
.../graph/generator/AbstractGraphGenerator.java | 33 +
.../flink/graph/generator/CompleteGraph.java | 112 ++++
.../flink/graph/generator/CycleGraph.java | 60 ++
.../flink/graph/generator/EmptyGraph.java | 79 +++
.../flink/graph/generator/GraphGenerator.java | 51 ++
.../graph/generator/GraphGeneratorUtils.java | 119 ++++
.../apache/flink/graph/generator/GridGraph.java | 165 +++++
.../flink/graph/generator/HypercubeGraph.java | 65 ++
.../apache/flink/graph/generator/PathGraph.java | 60 ++
.../apache/flink/graph/generator/RMatGraph.java | 316 +++++++++
.../graph/generator/SingletonEdgeGraph.java | 107 +++
.../apache/flink/graph/generator/StarGraph.java | 100 +++
.../random/AbstractGeneratorFactory.java | 72 +++
.../flink/graph/generator/random/BlockInfo.java | 82 +++
.../random/JDKRandomGeneratorFactory.java | 72 +++
.../random/MersenneTwisterFactory.java | 72 +++
.../graph/generator/random/RandomGenerable.java | 41 ++
.../random/RandomGenerableFactory.java | 57 ++
.../graph/generator/AbstractGraphTest.java | 33 +
.../graph/generator/CompleteGraphTest.java | 84 +++
.../flink/graph/generator/CycleGraphTest.java | 83 +++
.../flink/graph/generator/EmptyGraphTest.java | 78 +++
.../flink/graph/generator/GridGraphTest.java | 93 +++
.../graph/generator/HypercubeGraphTest.java | 85 +++
.../flink/graph/generator/PathGraphTest.java | 83 +++
.../flink/graph/generator/RMatGraphTest.java | 70 ++
.../graph/generator/SingletonEdgeGraphTest.java | 84 +++
.../flink/graph/generator/StarGraphTest.java | 85 +++
.../apache/flink/graph/generator/TestUtils.java | 103 +++
35 files changed, 3616 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index d803be2..53d628d 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -2012,3 +2012,647 @@ vertex represents a group of vertices and each edge represents a group of edges
vertex and edge in the output graph stores the common group value and the number of represented elements.
{% top %}
+
+Graph Generators
+-----------
+
+Gelly provides a collection of scalable graph generators. Each generator is
+
+* parallelizable, in order to create large datasets
+* scale-free, generating the same graph regardless of parallelism
+* thrifty, using as few operators as possible
+
+Graph generators are configured using the builder pattern. The parallelism of generator
+operators can be set explicitly by calling `setParallelism(parallelism)`. Lowering the
+parallelism will reduce the allocation of memory and network buffers.
+
+Graph-specific configuration must be called first, then configuration common to all
+generators, and lastly the call to `generate()`. The following example configures a
+grid graph with two dimensions, configures the parallelism, and generates the graph.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+boolean wrapEndpoints = false;
+
+int parallelism = 4;
+
+Graph<LongValue,NullValue,NullValue> graph = new GridGraph(env)
+ .addDimension(2, wrapEndpoints)
+ .addDimension(4, wrapEndpoints)
+ .setParallelism(parallelism)
+ .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.GridGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+wrapEndpoints = false
+
+val parallelism = 4
+
+val graph = new GridGraph(env.getJavaEnv).addDimension(2, wrapEndpoints).addDimension(4, wrapEndpoints).setParallelism(parallelism).generate()
+{% endhighlight %}
+</div>
+</div>
+
+### Provided graph generators
+
+* [Complete Graph](#complete-graph)
+* [Cycle Graph](#cycle-graph)
+* [Empty Graph](#empty-graph)
+* [Grid Graph](#grid-graph)
+* [Hypercube Graph](#hypercube-graph)
+* [Path Graph](#path-graph)
+* [RMat Graph](#rmat-graph)
+* [Singleton Edge Graph](#singleton-edge-graph)
+* [Star Graph](#star-graph)
+
+### Complete Graph
+
+An undirected graph connecting every distinct pair of vertices.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+long vertexCount = 5;
+
+Graph<LongValue,NullValue,NullValue> graph = new CompleteGraph(env, vertexCount)
+ .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.CompleteGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexCount = 5
+
+val graph = new CompleteGraph(env.getJavaEnv, vertexCount).generate()
+{% endhighlight %}
+</div>
+</div>
+
+<svg class="graph" width="540" height="540"
+ xmlns="http://www.w3.org/2000/svg"
+ xmlns:xlink="http://www.w3.org/1999/xlink">
+
+ <line x1="270" y1="40" x2="489" y2="199" />
+ <line x1="270" y1="40" x2="405" y2="456" />
+ <line x1="270" y1="40" x2="135" y2="456" />
+ <line x1="270" y1="40" x2="51" y2="199" />
+
+ <line x1="489" y1="199" x2="405" y2="456" />
+ <line x1="489" y1="199" x2="135" y2="456" />
+ <line x1="489" y1="199" x2="51" y2="199" />
+
+ <line x1="405" y1="456" x2="135" y2="456" />
+ <line x1="405" y1="456" x2="51" y2="199" />
+
+ <line x1="135" y1="456" x2="51" y2="199" />
+
+ <circle cx="270" cy="40" r="20" />
+ <text x="270" y="40">0</text>
+
+ <circle cx="489" cy="199" r="20" />
+ <text x="489" y="199">1</text>
+
+ <circle cx="405" cy="456" r="20" />
+ <text x="405" y="456">2</text>
+
+ <circle cx="135" cy="456" r="20" />
+ <text x="135" y="456">3</text>
+
+ <circle cx="51" cy="199" r="20" />
+ <text x="51" y="199">4</text>
+</svg>
+
+### Cycle Graph
+
+An undirected graph where all edges form a single cycle.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+long vertexCount = 5;
+
+Graph<LongValue,NullValue,NullValue> graph = new CycleGraph(env, vertexCount)
+ .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.CycleGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexCount = 5
+
+val graph = new CycleGraph(env.getJavaEnv, vertexCount).generate()
+{% endhighlight %}
+</div>
+</div>
+
+<svg class="graph" width="540" height="540"
+ xmlns="http://www.w3.org/2000/svg"
+ xmlns:xlink="http://www.w3.org/1999/xlink">
+
+ <line x1="270" y1="40" x2="489" y2="199" />
+ <line x1="489" y1="199" x2="405" y2="456" />
+ <line x1="405" y1="456" x2="135" y2="456" />
+ <line x1="135" y1="456" x2="51" y2="199" />
+ <line x1="51" y1="199" x2="270" y2="40" />
+
+ <circle cx="270" cy="40" r="20" />
+ <text x="270" y="40">0</text>
+
+ <circle cx="489" cy="199" r="20" />
+ <text x="489" y="199">1</text>
+
+ <circle cx="405" cy="456" r="20" />
+ <text x="405" y="456">2</text>
+
+ <circle cx="135" cy="456" r="20" />
+ <text x="135" y="456">3</text>
+
+ <circle cx="51" cy="199" r="20" />
+ <text x="51" y="199">4</text>
+</svg>
+
+### Empty Graph
+
+The graph containing no edges.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+long vertexCount = 5;
+
+Graph<LongValue,NullValue,NullValue> graph = new EmptyGraph(env, vertexCount)
+ .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.EmptyGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexCount = 5
+
+val graph = new EmptyGraph(env.getJavaEnv, vertexCount).generate()
+{% endhighlight %}
+</div>
+</div>
+
+<svg class="graph" width="540" height="80"
+ xmlns="http://www.w3.org/2000/svg"
+ xmlns:xlink="http://www.w3.org/1999/xlink">
+
+ <circle cx="30" cy="40" r="20" />
+ <text x="30" y="40">0</text>
+
+ <circle cx="150" cy="40" r="20" />
+ <text x="150" y="40">1</text>
+
+ <circle cx="270" cy="40" r="20" />
+ <text x="270" y="40">2</text>
+
+ <circle cx="390" cy="40" r="20" />
+ <text x="390" y="40">3</text>
+
+ <circle cx="510" cy="40" r="20" />
+ <text x="510" y="40">4</text>
+</svg>
+
+### Grid Graph
+
+An undirected graph connecting vertices in a regular tiling in one or more dimensions.
+Each dimension is configured separately. When the dimension size is at least three the
+endpoints are optionally connected by setting `wrapEndpoints`. Changing the following
+example to `addDimension(4, true)` would connect `0` to `3` and `4` to `7`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+boolean wrapEndpoints = false;
+
+Graph<LongValue,NullValue,NullValue> graph = new GridGraph(env)
+ .addDimension(2, wrapEndpoints)
+ .addDimension(4, wrapEndpoints)
+ .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.GridGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val wrapEndpoints = false
+
+val graph = new GridGraph(env.getJavaEnv).addDimension(2, wrapEndpoints).addDimension(4, wrapEndpoints).generate()
+{% endhighlight %}
+</div>
+</div>
+
+<svg class="graph" width="540" height="200"
+ xmlns="http://www.w3.org/2000/svg"
+ xmlns:xlink="http://www.w3.org/1999/xlink">
+
+ <line x1="30" y1="40" x2="510" y2="40" />
+ <line x1="30" y1="160" x2="510" y2="160" />
+
+ <line x1="30" y1="40" x2="30" y2="160" />
+ <line x1="190" y1="40" x2="190" y2="160" />
+ <line x1="350" y1="40" x2="350" y2="160" />
+ <line x1="510" y1="40" x2="510" y2="160" />
+
+ <circle cx="30" cy="40" r="20" />
+ <text x="30" y="40">0</text>
+
+ <circle cx="190" cy="40" r="20" />
+ <text x="190" y="40">1</text>
+
+ <circle cx="350" cy="40" r="20" />
+ <text x="350" y="40">2</text>
+
+ <circle cx="510" cy="40" r="20" />
+ <text x="510" y="40">3</text>
+
+ <circle cx="30" cy="160" r="20" />
+ <text x="30" y="160">4</text>
+
+ <circle cx="190" cy="160" r="20" />
+ <text x="190" y="160">5</text>
+
+ <circle cx="350" cy="160" r="20" />
+ <text x="350" y="160">6</text>
+
+ <circle cx="510" cy="160" r="20" />
+ <text x="510" y="160">7</text>
+</svg>
+
+### Hypercube Graph
+
+An undirected graph where edges form an n-dimensional hypercube. Each vertex
+in a hypercube connects to one other vertex in each dimension.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+long dimensions = 3;
+
+Graph<LongValue,NullValue,NullValue> graph = new HypercubeGraph(env, dimensions)
+ .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.HypercubeGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val dimensions = 3
+
+val graph = new HypercubeGraph(env.getJavaEnv, dimensions).generate()
+{% endhighlight %}
+</div>
+</div>
+
+<svg class="graph" width="540" height="320"
+ xmlns="http://www.w3.org/2000/svg"
+ xmlns:xlink="http://www.w3.org/1999/xlink">
+
+ <line x1="190" y1="120" x2="350" y2="120" />
+ <line x1="190" y1="200" x2="350" y2="200" />
+ <line x1="190" y1="120" x2="190" y2="200" />
+ <line x1="350" y1="120" x2="350" y2="200" />
+
+ <line x1="30" y1="40" x2="510" y2="40" />
+ <line x1="30" y1="280" x2="510" y2="280" />
+ <line x1="30" y1="40" x2="30" y2="280" />
+ <line x1="510" y1="40" x2="510" y2="280" />
+
+ <line x1="190" y1="120" x2="30" y2="40" />
+ <line x1="350" y1="120" x2="510" y2="40" />
+ <line x1="190" y1="200" x2="30" y2="280" />
+ <line x1="350" y1="200" x2="510" y2="280" />
+
+ <circle cx="190" cy="120" r="20" />
+ <text x="190" y="120">0</text>
+
+ <circle cx="350" cy="120" r="20" />
+ <text x="350" y="120">1</text>
+
+ <circle cx="190" cy="200" r="20" />
+ <text x="190" y="200">2</text>
+
+ <circle cx="350" cy="200" r="20" />
+ <text x="350" y="200">3</text>
+
+ <circle cx="30" cy="40" r="20" />
+ <text x="30" y="40">4</text>
+
+ <circle cx="510" cy="40" r="20" />
+ <text x="510" y="40">5</text>
+
+ <circle cx="30" cy="280" r="20" />
+ <text x="30" y="280">6</text>
+
+ <circle cx="510" cy="280" r="20" />
+ <text x="510" y="280">7</text>
+</svg>
+
+### Path Graph
+
+An undirected Graph where all edges form a single path.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+long vertexCount = 5
+
+Graph<LongValue,NullValue,NullValue> graph = new PathGraph(env, vertexCount)
+ .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.PathGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexCount = 5
+
+val graph = new PathGraph(env.getJavaEnv, vertexCount).generate()
+{% endhighlight %}
+</div>
+</div>
+
+<svg class="graph" width="540" height="80"
+ xmlns="http://www.w3.org/2000/svg"
+ xmlns:xlink="http://www.w3.org/1999/xlink">
+
+ <line x1="30" y1="40" x2="510" y2="40" />
+
+ <circle cx="30" cy="40" r="20" />
+ <text x="30" y="40">0</text>
+
+ <circle cx="150" cy="40" r="20" />
+ <text x="150" y="40">1</text>
+
+ <circle cx="270" cy="40" r="20" />
+ <text x="270" y="40">2</text>
+
+ <circle cx="390" cy="40" r="20" />
+ <text x="390" y="40">3</text>
+
+ <circle cx="510" cy="40" r="20" />
+ <text x="510" y="40">4</text>
+</svg>
+
+### RMat Graph
+
+A directed or undirected power-law graph generated using the
+[Recursive Matrix (R-Mat)](http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf) model.
+
+RMat is a stochastic generator configured with a source of randomness implementing the
+`RandomGenerableFactory` interface. Provided implemenations are `JDKRandomGeneratorFactory`
+and `MersenneTwisterFactory`. These generate an initial sequence of random values which are
+then used as seeds for generating the edges.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+int vertexCount = 1 << scale;
+int edgeCount = edgeFactor * vertexCount;
+
+Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+ .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.RMatGraph
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexCount = 1 << scale
+val edgeCount = edgeFactor * vertexCount
+
+val graph = new RMatGraph(env.getJavaEnv, rnd, vertexCount, edgeCount).generate()
+{% endhighlight %}
+</div>
+</div>
+
+The default RMat contants can be overridden as shown in the following example.
+The contants define the interdependence of bits from each generated edge's source
+and target labels. The RMat noise can be enabled and progressively perturbs the
+contants while generating each edge.
+
+The RMat generator can be configured to produce a simple graph by removing self-loops
+and duplicate edges. Symmetrization is performed either by a "clip-and-flip" throwing away
+the half matrix above the diagonal or a full "flip" preserving and mirroring all edges.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+int vertexCount = 1 << scale;
+int edgeCount = edgeFactor * vertexCount;
+
+boolean clipAndFlip = false;
+
+Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+ .setConstants(0.57f, 0.19f, 0.19f)
+ .setNoise(true, 0.10f)
+ .setSimpleGraph(true, clipAndFlip)
+ .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.RMatGraph
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexCount = 1 << scale
+val edgeCount = edgeFactor * vertexCount
+
+clipAndFlip = false
+
+val graph = new RMatGraph(env.getJavaEnv, rnd, vertexCount, edgeCount).setConstants(0.57f, 0.19f, 0.19f).setNoise(true, 0.10f).setSimpleGraph(true, clipAndFlip).generate()
+{% endhighlight %}
+</div>
+</div>
+
+### Singleton Edge Graph
+
+An undirected graph containing isolated two-paths.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+long vertexPairCount = 4
+
+// note: configured with the number of vertex pairs
+Graph<LongValue,NullValue,NullValue> graph = new SingletonEdgeGraph(env, vertexPairCount)
+ .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.SingletonEdgeGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexPairCount = 4
+
+// note: configured with the number of vertex pairs
+val graph = new SingletonEdgeGraph(env.getJavaEnv, vertexPairCount).generate()
+{% endhighlight %}
+</div>
+</div>
+
+<svg class="graph" width="540" height="200"
+ xmlns="http://www.w3.org/2000/svg"
+ xmlns:xlink="http://www.w3.org/1999/xlink">
+
+ <line x1="30" y1="40" x2="190" y2="40" />
+ <line x1="350" y1="40" x2="510" y2="40" />
+ <line x1="30" y1="160" x2="190" y2="160" />
+ <line x1="350" y1="160" x2="510" y2="160" />
+
+ <circle cx="30" cy="40" r="20" />
+ <text x="30" y="40">0</text>
+
+ <circle cx="190" cy="40" r="20" />
+ <text x="190" y="40">1</text>
+
+ <circle cx="350" cy="40" r="20" />
+ <text x="350" y="40">2</text>
+
+ <circle cx="510" cy="40" r="20" />
+ <text x="510" y="40">3</text>
+
+ <circle cx="30" cy="160" r="20" />
+ <text x="30" y="160">4</text>
+
+ <circle cx="190" cy="160" r="20" />
+ <text x="190" y="160">5</text>
+
+ <circle cx="350" cy="160" r="20" />
+ <text x="350" y="160">6</text>
+
+ <circle cx="510" cy="160" r="20" />
+ <text x="510" y="160">7</text>
+</svg>
+
+### Star Graph
+
+An undirected graph containing a single central vertex connected to all other leaf vertices.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+long vertexCount = 6;
+
+Graph<LongValue,NullValue,NullValue> graph = new StarGraph(env, vertexCount)
+ .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.StarGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexCount = 6
+
+val graph = new StarGraph(env.getJavaEnv, vertexCount).generate()
+{% endhighlight %}
+</div>
+</div>
+
+<svg class="graph" width="540" height="540"
+ xmlns="http://www.w3.org/2000/svg"
+ xmlns:xlink="http://www.w3.org/1999/xlink">
+
+ <line x1="270" y1="270" x2="270" y2="40" />
+ <line x1="270" y1="270" x2="489" y2="199" />
+ <line x1="270" y1="270" x2="405" y2="456" />
+ <line x1="270" y1="270" x2="135" y2="456" />
+ <line x1="270" y1="270" x2="51" y2="199" />
+
+ <circle cx="270" cy="270" r="20" />
+ <text x="270" y="270">0</text>
+
+ <circle cx="270" cy="40" r="20" />
+ <text x="270" y="40">1</text>
+
+ <circle cx="489" cy="199" r="20" />
+ <text x="489" y="199">2</text>
+
+ <circle cx="405" cy="456" r="20" />
+ <text x="405" y="456">3</text>
+
+ <circle cx="135" cy="456" r="20" />
+ <text x="135" y="456">4</text>
+
+ <circle cx="51" cy="199" r="20" />
+ <text x="51" y="199">5</text>
+</svg>
+
+{% top %}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/docs/page/css/flink.css
----------------------------------------------------------------------
diff --git a/docs/page/css/flink.css b/docs/page/css/flink.css
index 9d259a1..1b4e4c4 100644
--- a/docs/page/css/flink.css
+++ b/docs/page/css/flink.css
@@ -245,3 +245,24 @@ img.offset {
*:hover > .anchorjs-link {
transition: color .25s linear;
}
+
+/*=============================================================================
+ Gelly Graphs
+=============================================================================*/
+
+svg.graph line {
+ stroke: rgb(255,0,0);
+ stroke-width: 4;
+}
+
+svg.graph circle {
+ fill: red;
+ stroke: black;
+ stroke-width: 3;
+}
+
+svg.graph text {
+ dominant-baseline: central;
+ font-size: 32px;
+ text-anchor: middle;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-core/src/main/java/org/apache/flink/util/LongValueSequenceIterator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/LongValueSequenceIterator.java b/flink-core/src/main/java/org/apache/flink/util/LongValueSequenceIterator.java
new file mode 100644
index 0000000..86a8ce6
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/LongValueSequenceIterator.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.types.LongValue;
+
+import java.util.NoSuchElementException;
+
+/**
+ * The {@code LongValueSequenceIterator} is an iterator that returns a sequence of numbers (as {@code LongValue})s.
+ * The iterator is splittable (as defined by {@link SplittableIterator}, i.e., it can be divided into multiple
+ * iterators that each return a subsequence of the number sequence.
+ */
+@Public
+public class LongValueSequenceIterator extends SplittableIterator<LongValue> {
+
+ private static final long serialVersionUID = 1L;
+
+ /** The last number returned by the iterator */
+ private final long to;
+
+ /** The next number to be returned */
+ private long current;
+
+ /** The next value to be returned */
+ private LongValue currentValue = new LongValue();
+
+ /**
+ * Creates a new splittable iterator, returning the range [from, to].
+ * Both boundaries of the interval are inclusive.
+ *
+ * @param from The first number returned by the iterator.
+ * @param to The last number returned by the iterator.
+ */
+ public LongValueSequenceIterator(long from, long to) {
+ if (from > to) {
+ throw new IllegalArgumentException("The 'to' value must not be smaller than the 'from' value.");
+ }
+
+ this.current = from;
+ this.to = to;
+ }
+
+
+ /**
+ * Internal constructor to allow for empty iterators.
+ *
+ * @param from The first number returned by the iterator.
+ * @param to The last number returned by the iterator.
+ * @param unused A dummy parameter to disambiguate the constructor.
+ */
+ private LongValueSequenceIterator(long from, long to, boolean unused) {
+ this.current = from;
+ this.to = to;
+ }
+
+ public long getCurrent() {
+ return this.current;
+ }
+
+ public long getTo() {
+ return this.to;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return current <= to;
+ }
+
+ @Override
+ public LongValue next() {
+ if (current <= to) {
+ currentValue.setValue(current++);
+ return currentValue;
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LongValueSequenceIterator[] split(int numPartitions) {
+ if (numPartitions < 1) {
+ throw new IllegalArgumentException("The number of partitions must be at least 1.");
+ }
+
+ if (numPartitions == 1) {
+ return new LongValueSequenceIterator[] { new LongValueSequenceIterator(current, to) };
+ }
+
+ // here, numPartitions >= 2 !!!
+
+ long elementsPerSplit;
+
+ if (to - current + 1 >= 0) {
+ elementsPerSplit = (to - current + 1) / numPartitions;
+ }
+ else {
+ // long overflow of the range.
+ // we compute based on half the distance, to prevent the overflow.
+ // in most cases it holds that: current < 0 and to > 0, except for: to == 0 and current == Long.MIN_VALUE
+ // the later needs a special case
+ final long halfDiff; // must be positive
+
+ if (current == Long.MIN_VALUE) {
+ // this means to >= 0
+ halfDiff = (Long.MAX_VALUE/2+1) + to/2;
+ } else {
+ long posFrom = -current;
+ if (posFrom > to) {
+ halfDiff = to + ((posFrom - to) / 2);
+ } else {
+ halfDiff = posFrom + ((to - posFrom) / 2);
+ }
+ }
+ elementsPerSplit = halfDiff / numPartitions * 2;
+ }
+
+ if (elementsPerSplit < Long.MAX_VALUE) {
+ // figure out how many get one in addition
+ long numWithExtra = -(elementsPerSplit * numPartitions) + to - current + 1;
+
+ // based on rounding errors, we may have lost one)
+ if (numWithExtra > numPartitions) {
+ elementsPerSplit++;
+ numWithExtra -= numPartitions;
+
+ if (numWithExtra > numPartitions) {
+ throw new RuntimeException("Bug in splitting logic. To much rounding loss.");
+ }
+ }
+
+ LongValueSequenceIterator[] iters = new LongValueSequenceIterator[numPartitions];
+ long curr = current;
+ int i = 0;
+ for (; i < numWithExtra; i++) {
+ long next = curr + elementsPerSplit + 1;
+ iters[i] = new LongValueSequenceIterator(curr, next-1);
+ curr = next;
+ }
+ for (; i < numPartitions; i++) {
+ long next = curr + elementsPerSplit;
+ iters[i] = new LongValueSequenceIterator(curr, next-1, true);
+ curr = next;
+ }
+
+ return iters;
+ }
+ else {
+ // this can only be the case when there are two partitions
+ if (numPartitions != 2) {
+ throw new RuntimeException("Bug in splitting logic.");
+ }
+
+ return new LongValueSequenceIterator[] {
+ new LongValueSequenceIterator(current, current + elementsPerSplit),
+ new LongValueSequenceIterator(current + elementsPerSplit, to)
+ };
+ }
+ }
+
+
+ @Override
+ public int getMaximumNumberOfSplits() {
+ if (to >= Integer.MAX_VALUE || current <= Integer.MIN_VALUE || to-current+1 >= Integer.MAX_VALUE) {
+ return Integer.MAX_VALUE;
+ }
+ else {
+ return (int) (to-current+1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-core/src/test/java/org/apache/flink/util/LongValueSequenceIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/LongValueSequenceIteratorTest.java b/flink-core/src/test/java/org/apache/flink/util/LongValueSequenceIteratorTest.java
new file mode 100644
index 0000000..3407690
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/LongValueSequenceIteratorTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+
+public class LongValueSequenceIteratorTest extends TestLogger {
+
+ @Test
+ public void testSplitRegular() {
+ testSplitting(new org.apache.flink.util.LongValueSequenceIterator(0, 10), 2);
+ testSplitting(new org.apache.flink.util.LongValueSequenceIterator(100, 100000), 7);
+ testSplitting(new org.apache.flink.util.LongValueSequenceIterator(-100, 0), 5);
+ testSplitting(new org.apache.flink.util.LongValueSequenceIterator(-100, 100), 3);
+ }
+
+ @Test
+ public void testSplittingLargeRangesBy2() {
+ testSplitting(new org.apache.flink.util.LongValueSequenceIterator(0, Long.MAX_VALUE), 2);
+ testSplitting(new org.apache.flink.util.LongValueSequenceIterator(-1000000000L, Long.MAX_VALUE), 2);
+ testSplitting(new org.apache.flink.util.LongValueSequenceIterator(Long.MIN_VALUE, Long.MAX_VALUE), 2);
+ }
+
+ @Test
+ public void testSplittingTooSmallRanges() {
+ testSplitting(new org.apache.flink.util.LongValueSequenceIterator(0, 0), 2);
+ testSplitting(new org.apache.flink.util.LongValueSequenceIterator(-5, -5), 2);
+ testSplitting(new org.apache.flink.util.LongValueSequenceIterator(-5, -4), 3);
+ testSplitting(new org.apache.flink.util.LongValueSequenceIterator(10, 15), 10);
+ }
+
+ private static final void testSplitting(org.apache.flink.util.LongValueSequenceIterator iter, int numSplits) {
+ org.apache.flink.util.LongValueSequenceIterator[] splits = iter.split(numSplits);
+
+ assertEquals(numSplits, splits.length);
+
+ // test start and end of range
+ assertEquals(iter.getCurrent(), splits[0].getCurrent());
+ assertEquals(iter.getTo(), splits[numSplits-1].getTo());
+
+ // test continuous range
+ for (int i = 1; i < splits.length; i++) {
+ assertEquals(splits[i-1].getTo() + 1, splits[i].getCurrent());
+ }
+
+ testMaxSplitDiff(splits);
+ }
+
+
+ private static final void testMaxSplitDiff(org.apache.flink.util.LongValueSequenceIterator[] iters) {
+ long minSplitSize = Long.MAX_VALUE;
+ long maxSplitSize = Long.MIN_VALUE;
+
+ for (LongValueSequenceIterator iter : iters) {
+ long diff;
+ if (iter.getTo() < iter.getCurrent()) {
+ diff = 0;
+ } else {
+ diff = iter.getTo() - iter.getCurrent();
+ }
+ if (diff < 0) {
+ diff = Long.MAX_VALUE;
+ }
+
+ minSplitSize = Math.min(minSplitSize, diff);
+ maxSplitSize = Math.max(maxSplitSize, diff);
+ }
+
+ assertTrue(maxSplitSize == minSplitSize || maxSplitSize-1 == minSplitSize);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index f284b90..89b6d17 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -209,7 +209,7 @@ public final class Utils {
@Override
public String toString() {
- return "ChecksumHashCode " + this.checksum + ", count " + this.count;
+ return String.format("ChecksumHashCode 0x%016x, count %d", this.checksum, this.count);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
new file mode 100644
index 0000000..b9d6fbd
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.LongValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Generate an RMat graph for Graph 500.
+ *
+ * Note that this does not yet implement permutation of vertex labels or edges.
+ *
+ * @see <a href="http://www.graph500.org/specifications">Graph 500</a>
+ */
+public class Graph500 {
+
+ public static final int DEFAULT_SCALE = 10;
+
+ public static final int DEFAULT_EDGE_FACTOR = 16;
+
+ public static final boolean DEFAULT_SIMPLIFY = false;
+
+ public static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+ public static void main(String[] args) throws Exception {
+ // Set up the execution environment
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
+
+ ParameterTool parameters = ParameterTool.fromArgs(args);
+
+ // Generate RMat graph
+ int scale = parameters.getInt("scale", DEFAULT_SCALE);
+ int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
+
+ RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+ long vertexCount = 1 << scale;
+ long edgeCount = vertexCount * edgeFactor;
+
+ boolean simplify = parameters.getBoolean("simplify", DEFAULT_SIMPLIFY);
+ boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+
+ DataSet<Tuple2<LongValue,LongValue>> edges = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+ .setSimpleGraph(simplify, clipAndFlip)
+ .generate()
+ .getEdges()
+ .project(0, 1);
+
+ // Print, hash, or write RMat graph to disk
+ switch (parameters.get("output", "")) {
+ case "print":
+ edges.print();
+ break;
+
+ case "hash":
+ System.out.println(DataSetUtils.checksumHashCode(edges));
+ break;
+
+ case "csv":
+ String filename = parameters.get("filename");
+
+ String row_delimiter = parameters.get("row_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER);
+ String field_delimiter = parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
+
+ edges.writeAsCsv(filename, row_delimiter, field_delimiter);
+
+ env.execute();
+ break;
+ default:
+ System.out.println("A Graph500 generator using the Recursive Matrix (RMat) graph generator.");
+ System.out.println();
+ System.out.println("The graph matrix contains 2^scale vertices although not every vertex will");
+ System.out.println("be represented in an edge. The number of edges is edge_factor * 2^scale edges");
+ System.out.println("although some edges may be duplicates.");
+ System.out.println();
+ System.out.println("Note: this does not yet implement permutation of vertex labels or edges.");
+ System.out.println();
+ System.out.println("usage:");
+ System.out.println(" Graph500 [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print");
+ System.out.println(" Graph500 [--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash");
+ System.out.println(" Graph500 [--scale SCALE] [--edge_factor EDGE_FACTOR] --output csv" +
+ " --filename FILENAME [--row_delimiter ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]");
+
+ return;
+ }
+
+ JobExecutionResult result = env.getLastJobExecutionResult();
+
+ NumberFormat nf = NumberFormat.getInstance();
+ System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
new file mode 100644
index 0000000..2eee6d7
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+public abstract class AbstractGraphGenerator<K, VV, EV>
+implements GraphGenerator<K, VV, EV> {
+
+ // Optional configuration
+ protected int parallelism = -1;
+
+ @Override
+ public GraphGenerator<K,VV,EV> setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
new file mode 100644
index 0000000..5339b14
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.LongValueSequenceIterator;
+
+/*
+ * @see <a href="http://mathworld.wolfram.com/CompleteGraph.html">Complete Graph at Wolfram MathWorld</a>
+ */
+public class CompleteGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+ // Required to create the DataSource
+ private final ExecutionEnvironment env;
+
+ // Required configuration
+ private long vertexCount;
+
+ /**
+ * An undirected {@link Graph} connecting every distinct pair of vertices.
+ *
+ * @param env the Flink execution environment
+ * @param vertexCount number of vertices
+ */
+ public CompleteGraph(ExecutionEnvironment env, long vertexCount) {
+ if (vertexCount <= 0) {
+ throw new IllegalArgumentException("Vertex count must be greater than zero");
+ }
+
+ this.env = env;
+ this.vertexCount = vertexCount;
+ }
+
+ @Override
+ public Graph<LongValue,NullValue,NullValue> generate() {
+ // Vertices
+ DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+
+ // Edges
+ LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount - 1);
+
+ DataSet<Edge<LongValue,NullValue>> edges = env
+ .fromParallelCollection(iterator, LongValue.class)
+ .setParallelism(parallelism)
+ .name("Edge iterators")
+ .flatMap(new LinkVertexToAll(vertexCount))
+ .setParallelism(parallelism)
+ .name("Complete graph edges");
+
+ // Graph
+ return Graph.fromDataSet(vertices, edges, env);
+ }
+
+ @ForwardedFields("*->f0")
+ public class LinkVertexToAll
+ implements FlatMapFunction<LongValue, Edge<LongValue,NullValue>> {
+
+ private final long vertexCount;
+
+ private LongValue target = new LongValue();
+
+ private Edge<LongValue,NullValue> edge = new Edge<>(null, target, NullValue.getInstance());
+
+ public LinkVertexToAll(long vertex_count) {
+ this.vertexCount = vertex_count;
+ }
+
+ @Override
+ public void flatMap(LongValue source, Collector<Edge<LongValue,NullValue>> out)
+ throws Exception {
+ edge.f0 = source;
+
+ long s = source.getValue();
+ long t = (s + 1) % vertexCount;
+
+ while (s != t) {
+ target.setValue(t);
+ out.collect(edge);
+
+ if (++t == vertexCount) {
+ t = 0;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
new file mode 100644
index 0000000..2671efe
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+/*
+ * @see <a href="http://mathworld.wolfram.com/CycleGraph.html">Cycle Graph at Wolfram MathWorld</a>
+ */
+public class CycleGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+ // Required to create the DataSource
+ private final ExecutionEnvironment env;
+
+ // Required configuration
+ private long vertexCount;
+
+ /**
+ * An undirected {@link Graph} where all edges form a single cycle.
+ *
+ * @param env the Flink execution environment
+ * @param vertexCount number of vertices
+ */
+ public CycleGraph(ExecutionEnvironment env, long vertexCount) {
+ if (vertexCount <= 0) {
+ throw new IllegalArgumentException("Vertex count must be greater than zero");
+ }
+
+ this.env = env;
+ this.vertexCount = vertexCount;
+ }
+
+ @Override
+ public Graph<LongValue,NullValue,NullValue> generate() {
+ return new GridGraph(env)
+ .addDimension(vertexCount, true)
+ .setParallelism(parallelism)
+ .generate();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
new file mode 100644
index 0000000..05bfd89
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.util.Collections;
+
+/*
+ * @see <a href="http://mathworld.wolfram.com/EmptyGraph.html">Empty Graph at Wolfram MathWorld</a>
+ */
+public class EmptyGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+ // Required to create the DataSource
+ private final ExecutionEnvironment env;
+
+ // Required configuration
+ private long vertexCount;
+
+ /**
+ * The {@link Graph} containing no edges.
+ *
+ * @param env the Flink execution environment
+ * @param vertexCount number of vertices
+ */
+ public EmptyGraph(ExecutionEnvironment env, long vertexCount) {
+ if (vertexCount <= 0) {
+ throw new IllegalArgumentException("Vertex count must be greater than zero");
+ }
+
+ this.env = env;
+ this.vertexCount = vertexCount;
+ }
+
+ @Override
+ public Graph<LongValue,NullValue,NullValue> generate() {
+ // Vertices
+ DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+
+ // Edges
+ TypeInformation<Edge<LongValue,NullValue>> typeInformation = new TupleTypeInfo<>(
+ ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.NULL_VALUE_TYPE_INFO);
+
+ DataSource<Edge<LongValue,NullValue>> edges = env
+ .fromCollection(Collections.<Edge<LongValue,NullValue>>emptyList(), typeInformation)
+ .setParallelism(parallelism)
+ .name("Empty edge set");
+
+ // Graph
+ return Graph.fromDataSet(vertices, edges, env);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java
new file mode 100644
index 0000000..8fece81
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.graph.Graph;
+
+/**
+ * Graph generators shall be
+ * - parallelizable, in order to create large datasets
+ * - scale-free, generating the same graph regardless of parallelism
+ * - thrifty, using as few operators as possible
+ *
+ * Graph generators should prefer to emit edges sorted by the source label.
+ *
+ * @param <K> the key type for edge and vertex identifiers
+ * @param <VV> the value type for vertices
+ * @param <EV> the value type for edges
+ */
+public interface GraphGenerator<K, VV, EV> {
+
+ /**
+ * Generates the configured graph.
+ *
+ * @return generated graph
+ */
+ Graph<K,VV,EV> generate();
+
+ /**
+ * Override the operator parallelism.
+ *
+ * @param parallelism operator parallelism
+ * @return this
+ */
+ GraphGenerator<K,VV,EV> setParallelism(int parallelism);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
new file mode 100644
index 0000000..a7b5ce9
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.LongValueSequenceIterator;
+
+public class GraphGeneratorUtils {
+
+ /**
+ * Generates {@link Vertex Vertices} with sequential, numerical labels.
+ *
+ * @param env the Flink execution environment.
+ * @param parallelism operator parallelism
+ * @param vertexCount number of sequential vertex labels
+ * @return {@link DataSet} of sequentially labeled {@link Vertex Vertices}
+ */
+ public static DataSet<Vertex<LongValue,NullValue>> vertexSequence(ExecutionEnvironment env, int parallelism, long vertexCount) {
+ LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount-1);
+
+ DataSource<LongValue> vertexLabels = env
+ .fromParallelCollection(iterator, LongValue.class)
+ .setParallelism(parallelism)
+ .name("Vertex iterators");
+
+ DataSet<Vertex<LongValue,NullValue>> vertexSequence = vertexLabels
+ .map(new CreateVertex())
+ .setParallelism(parallelism)
+ .name("Vertex sequence");
+
+ return vertexSequence;
+ }
+
+ @ForwardedFields("*->f0")
+ private static class CreateVertex
+ implements MapFunction<LongValue, Vertex<LongValue,NullValue>> {
+
+ private Vertex<LongValue,NullValue> vertex = new Vertex<>(null, NullValue.getInstance());
+
+ @Override
+ public Vertex<LongValue, NullValue> map(LongValue value)
+ throws Exception {
+ vertex.f0 = value;
+
+ return vertex;
+ }
+ }
+
+ /**************************************************************************/
+
+ /**
+ * Generates {@link Vertex Vertices} present in the given set of {@link Edge}s.
+ *
+ * @param edges source {@link DataSet} of {@link Edge}s
+ * @param parallelism operator parallelism
+ * @param <K> label type
+ * @param <EV> edge value type
+ * @return {@link DataSet} of discovered {@link Vertex Vertices}
+ *
+ * @see {@link Graph#fromDataSet(DataSet, DataSet, ExecutionEnvironment)}
+ */
+ public static <K,EV> DataSet<Vertex<K,NullValue>> vertexSet(DataSet<Edge<K,EV>> edges, int parallelism) {
+ DataSet<Vertex<K,NullValue>> vertexSet = edges
+ .flatMap(new EmitSrcAndTarget<K, EV>())
+ .setParallelism(parallelism)
+ .name("Emit source and target labels");
+
+ DataSet<Vertex<K,NullValue>> distinctVertexSet = vertexSet
+ .distinct()
+ .setParallelism(parallelism)
+ .name("Emit vertex labels");
+
+ return distinctVertexSet;
+ }
+
+ /**
+ * @see {@link Graph.EmitSrcAndTarget}
+ */
+ private static final class EmitSrcAndTarget<K,EV>
+ implements FlatMapFunction<Edge<K,EV>, Vertex<K,NullValue>> {
+
+ private Vertex<K,NullValue> output = new Vertex<>(null, new NullValue());
+
+ @Override
+ public void flatMap(Edge<K,EV> value, Collector<Vertex<K,NullValue>> out) throws Exception {
+ output.f0 = value.f0;
+ out.collect(output);
+ output.f0 = value.f1;
+ out.collect(output);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
new file mode 100644
index 0000000..399a2f9
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.LongValueSequenceIterator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ * @see <a href="http://mathworld.wolfram.com/GridGraph.html">Grid Graph at Wolfram MathWorld</a>
+ */
+public class GridGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+ // Required to create the DataSource
+ private final ExecutionEnvironment env;
+
+ // Required configuration
+ private List<Tuple2<Long,Boolean>> dimensions = new ArrayList<>();
+
+ private long vertexCount = 1;
+
+ /**
+ * An undirected {@link Graph} connecting vertices in a regular tiling in one or more dimensions.
+ *
+ * @param env the Flink execution environment
+ */
+ public GridGraph(ExecutionEnvironment env) {
+ this.env = env;
+ }
+
+ /**
+ * Required configuration for each dimension of the graph.
+ *
+ * @param size number of vertices; dimensions of size 1 are prohibited due to having no effect
+ * on the generated graph
+ * @param wrapEndpoints whether to connect first and last vertices; this has no effect on
+ * dimensions of size 2
+ * @return this
+ */
+ public GridGraph addDimension(long size, boolean wrapEndpoints) {
+ if (size <= 1) {
+ throw new IllegalArgumentException("Dimension size must be greater than 1");
+ }
+
+ vertexCount *= size;
+
+ // prevent duplicate edges
+ if (size == 2) {
+ wrapEndpoints = false;
+ }
+
+ dimensions.add(new Tuple2<>(size, wrapEndpoints));
+
+ return this;
+ }
+
+ @Override
+ public Graph<LongValue,NullValue,NullValue> generate() {
+ if (dimensions.isEmpty()) {
+ throw new RuntimeException("No dimensions added to GridGraph");
+ }
+
+ // Vertices
+ DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+
+ // Edges
+ LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount - 1);
+
+ DataSet<Edge<LongValue,NullValue>> edges = env
+ .fromParallelCollection(iterator, LongValue.class)
+ .setParallelism(parallelism)
+ .name("Edge iterators")
+ .flatMap(new LinkVertexToNeighbors(vertexCount, dimensions))
+ .setParallelism(parallelism)
+ .name("Grid graph edges");
+
+ // Graph
+ return Graph.fromDataSet(vertices, edges, env);
+ }
+
+ @ForwardedFields("*->f0")
+ public class LinkVertexToNeighbors
+ implements FlatMapFunction<LongValue, Edge<LongValue,NullValue>> {
+
+ private long vertexCount;
+
+ private List<Tuple2<Long,Boolean>> dimensions;
+
+ private LongValue target = new LongValue();
+
+ private Edge<LongValue,NullValue> edge = new Edge<>(null, target, NullValue.getInstance());
+
+ public LinkVertexToNeighbors(long vertexCount, List<Tuple2<Long,Boolean>> dimensions) {
+ this.vertexCount = vertexCount;
+ this.dimensions = dimensions;
+ }
+
+ @Override
+ public void flatMap(LongValue source, Collector<Edge<LongValue,NullValue>> out)
+ throws Exception {
+ edge.f0 = source;
+ long val = source.getValue();
+
+ // the distance between neighbors in a given iteration
+ long increment = vertexCount;
+
+ // the value in the remaining dimensions
+ long remainder = val;
+
+ for (Tuple2<Long,Boolean> dimension : dimensions) {
+ increment /= dimension.f0;
+
+ // the index within this dimension
+ long index = remainder / increment;
+
+ if (index > 0) {
+ target.setValue(val - increment);
+ out.collect(edge);
+ } else if (dimension.f1) {
+ target.setValue(val + increment * (dimension.f0 - 1));
+ out.collect(edge);
+ }
+
+ if (index < dimension.f0 - 1) {
+ target.setValue(val + increment);
+ out.collect(edge);
+ } else if (dimension.f1) {
+ target.setValue(val - increment * (dimension.f0 - 1));
+ out.collect(edge);
+ }
+
+ remainder %= increment;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
new file mode 100644
index 0000000..40968a0
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+/*
+ * @see <a href="http://mathworld.wolfram.com/HypercubeGraph.html">Hypercube Graph at Wolfram MathWorld</a>
+ */
+public class HypercubeGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+ // Required to create the DataSource
+ private final ExecutionEnvironment env;
+
+ // Required configuration
+ private long dimensions;
+
+ /**
+ * An undirected {@link Graph} where edges form an n-dimensional hypercube.
+ *
+ * @param env the Flink execution environment
+ * @param dimensions number of dimensions
+ */
+ public HypercubeGraph(ExecutionEnvironment env, long dimensions) {
+ if (dimensions <= 0) {
+ throw new IllegalArgumentException("Number of dimensions must be greater than zero");
+ }
+
+ this.env = env;
+ this.dimensions = dimensions;
+ }
+
+ @Override
+ public Graph<LongValue, NullValue, NullValue> generate() {
+ GridGraph graph = new GridGraph(env);
+
+ for (int i = 0; i < dimensions; i++) {
+ graph.addDimension(2, false);
+ }
+
+ return graph
+ .setParallelism(parallelism)
+ .generate();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
new file mode 100644
index 0000000..1aec723
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+/*
+ * @see <a href="http://mathworld.wolfram.com/PathGraph.html">Path Graph at Wolfram MathWorld</a>
+ */
+public class PathGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+ // Required to create the DataSource
+ private final ExecutionEnvironment env;
+
+ // Required configuration
+ private long vertexCount;
+
+ /**
+ * An undirected {@link Graph} where all edges form a single path.
+ *
+ * @param env the Flink execution environment
+ * @param vertexCount number of vertices
+ */
+ public PathGraph(ExecutionEnvironment env, long vertexCount) {
+ if (vertexCount <= 0) {
+ throw new IllegalArgumentException("Vertex count must be greater than zero");
+ }
+
+ this.env = env;
+ this.vertexCount = vertexCount;
+ }
+
+ @Override
+ public Graph<LongValue,NullValue,NullValue> generate() {
+ return new GridGraph(env)
+ .addDimension(vertexCount, false)
+ .setParallelism(parallelism)
+ .generate();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
new file mode 100644
index 0000000..246d8bb
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.commons.math3.random.RandomGenerator;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.generator.random.BlockInfo;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+
+/*
+ * @see <a href="http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf">R-MAT: A Recursive Model for Graph Mining</a>
+ */
+public class RMatGraph<T extends RandomGenerator>
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+ // Default RMat constants
+ public static final float DEFAULT_A = 0.57f;
+
+ public static final float DEFAULT_B = 0.19f;
+
+ public static final float DEFAULT_C = 0.19f;
+
+ public static final float DEFAULT_NOISE = 0.10f;
+
+ // Required to create the DataSource
+ private ExecutionEnvironment env;
+
+ // Required configuration
+ private final RandomGenerableFactory<T> randomGenerableFactory;
+
+ private final long vertexCount;
+
+ private final long edgeCount;
+
+ // Optional configuration
+ private float A = DEFAULT_A;
+
+ private float B = DEFAULT_B;
+
+ private float C = DEFAULT_C;
+
+ private boolean noiseEnabled = false;
+
+ private float noise = DEFAULT_NOISE;
+
+ private boolean simpleGraph = false;
+
+ private boolean clipAndFlip = false;
+
+ /**
+ * Generate a directed or undirected power-law {@link Graph} using the
+ * Recursive Matrix (R-Mat) model.
+ *
+ * @param env the Flink execution environment
+ * @param randomGeneratorFactory source of randomness
+ * @param vertexCount number of vertices
+ * @param edgeCount number of edges
+ */
+ public RMatGraph(ExecutionEnvironment env, RandomGenerableFactory<T> randomGeneratorFactory, long vertexCount, long edgeCount) {
+ if (vertexCount <= 0) {
+ throw new IllegalArgumentException("Vertex count must be greater than zero");
+ }
+
+ if (edgeCount <= 0) {
+ throw new IllegalArgumentException("Edge count must be greater than zero");
+ }
+
+ this.env = env;
+ this.randomGenerableFactory = randomGeneratorFactory;
+ this.vertexCount = vertexCount;
+ this.edgeCount = edgeCount;
+ }
+
+ /**
+ * The parameters for recursively subdividing the adjacency matrix.
+ *
+ * Setting A = B = C = 0.25 emulates the Erdős–Rényi model.
+ *
+ * Graph500 uses A = 0.57, B = C = 0.19.
+ *
+ * @param A likelihood of source bit = 0, target bit = 0
+ * @param B likelihood of source bit = 0, target bit = 1
+ * @param C likelihood of source bit = 1, target bit = 0
+ * @return this
+ */
+ public RMatGraph<T> setConstants(float A, float B, float C) {
+ if (A < 0.0f || B < 0.0f || C < 0.0f || A + B + C > 1.0f) {
+ throw new RuntimeException("RMat parameters A, B, and C must be non-negative and sum to less than or equal to one");
+ }
+
+ this.A = A;
+ this.B = B;
+ this.C = C;
+
+ return this;
+ }
+
+ /**
+ * Enable and configure noise. Each edge is generated independently, but
+ * when noise is enabled the parameters A, B, and C are randomly increased
+ * or decreased, then normalized, by a fraction of the noise factor during
+ * the computation of each bit.
+ *
+ * @param noiseEnabled whether to enable noise perturbation
+ * @param noise strength of noise perturbation
+ * @return this
+ */
+ public RMatGraph<T> setNoise(boolean noiseEnabled, float noise) {
+ if (noise < 0.0f || noise > 2.0f) {
+ throw new RuntimeException("RMat parameter noise must be non-negative and less than or equal to 2.0");
+ }
+
+ this.noiseEnabled = noiseEnabled;
+ this.noise = noise;
+
+ return this;
+ }
+
+ /**
+ * When configured for a simple graph duplicate edges and self-loops will
+ * be removed. The clip-and-flip method removes edges where source < target
+ * before symmetrizing the graph.
+ *
+ * @param simpleGraph whether to generate a simple graph
+ * @param clipAndFlip method for generating simple graph
+ * @return this
+ */
+ public RMatGraph<T> setSimpleGraph(boolean simpleGraph, boolean clipAndFlip) {
+ this.simpleGraph = simpleGraph;
+ this.clipAndFlip = clipAndFlip;
+
+ return this;
+ }
+
+ @Override
+ public Graph<LongValue,NullValue,NullValue> generate() {
+ int scale = Long.SIZE - Long.numberOfLeadingZeros(vertexCount - 1);
+
+ // Edges
+ int cyclesPerEdge = noiseEnabled ? 5 * scale : scale;
+
+ List<BlockInfo<T>> generatorBlocks = randomGenerableFactory
+ .getRandomGenerables(edgeCount, cyclesPerEdge);
+
+ DataSet<Edge<LongValue,NullValue>> generatedEdges = env
+ .fromCollection(generatorBlocks)
+ .name("Random generators")
+ .rebalance()
+ .setParallelism(parallelism)
+ .name("Rebalance")
+ .flatMap(new GenerateEdges<T>(vertexCount, scale, A, B, C, noiseEnabled, noise, simpleGraph, clipAndFlip))
+ .setParallelism(parallelism)
+ .name("RMat graph edges");
+
+ DataSet<Edge<LongValue,NullValue>> edges;
+
+ if (simpleGraph) {
+ edges = generatedEdges
+ .distinct(1, 0)
+ .setParallelism(parallelism)
+ .name("Distinct");
+ } else {
+ edges = generatedEdges;
+ }
+
+ // Vertices
+ DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSet(edges, parallelism);
+
+ // Graph
+ return Graph.fromDataSet(vertices, edges, env);
+ }
+
+ private static final class GenerateEdges<T extends RandomGenerator>
+ implements FlatMapFunction<BlockInfo<T>, Edge<LongValue,NullValue>> {
+
+ // Configuration
+ private final long vertexCount;
+
+ private final int scale;
+
+ private final float A;
+
+ private final float B;
+
+ private final float C;
+
+ private final float D;
+
+ private final boolean noiseEnabled;
+
+ private final float noise;
+
+ private final boolean simpleGraph;
+
+ private final boolean clipAndFlip;
+
+ // Output
+ private LongValue source = new LongValue();
+
+ private LongValue target = new LongValue();
+
+ private Edge<LongValue,NullValue> sourceToTarget = new Edge<>(source, target, NullValue.getInstance());
+
+ private Edge<LongValue,NullValue> targetToSource = new Edge<>(target, source, NullValue.getInstance());
+
+ public GenerateEdges(long vertexCount, int scale, float A, float B, float C, boolean noiseEnabled, float noise, boolean simpleGraph, boolean clipAndFlip) {
+ this.vertexCount = vertexCount;
+ this.scale = scale;
+ this.A = A;
+ this.B = B;
+ this.C = C;
+ this.D = 1.0f - A - B - C;
+ this.noiseEnabled = noiseEnabled;
+ this.noise = noise;
+ this.simpleGraph = simpleGraph;
+ this.clipAndFlip = clipAndFlip;
+ }
+
+ @Override
+ public void flatMap(BlockInfo<T> blockInfo, Collector<Edge<LongValue,NullValue>> out)
+ throws Exception {
+ RandomGenerator rng = blockInfo.getRandomGenerable().generator();
+ long edgesToGenerate = blockInfo.getElementCount();
+
+ while (edgesToGenerate > 0) {
+ long x = 0;
+ long y = 0;
+
+ // matrix constants are reset for each edge
+ float a = A;
+ float b = B;
+ float c = C;
+ float d = D;
+
+ for (int bit = 0; bit < scale; bit++) {
+ // generated next bit for source and target
+ x <<= 1;
+ y <<= 1;
+
+ float random = rng.nextFloat();
+
+ if (random <= a) {
+ } else if (random <= a + b) {
+ y += 1;
+ } else if (random <= a + b + c) {
+ x += 1;
+ } else {
+ x += 1;
+ y += 1;
+ }
+
+ if (noiseEnabled) {
+ // noise is bounded such that all parameters remain non-negative
+ a *= 1.0 - noise / 2 + rng.nextFloat() * noise;
+ b *= 1.0 - noise / 2 + rng.nextFloat() * noise;
+ c *= 1.0 - noise / 2 + rng.nextFloat() * noise;
+ d *= 1.0 - noise / 2 + rng.nextFloat() * noise;
+
+ // normalize back to a + b + c + d = 1.0
+ float norm = 1.0f / (a + b + c + d);
+
+ a *= norm;
+ b *= norm;
+ c *= norm;
+
+ // could multiply by norm, but subtract to minimize rounding error
+ d = 1.0f - a - b - c;
+ }
+ }
+
+ // if vertexCount is not a power-of-2 then discard edges outside the vertex range
+ if (x < vertexCount && y < vertexCount) {
+ source.setValue(x);
+ target.setValue(y);
+
+ if (simpleGraph) {
+ if ((clipAndFlip && x > y) || (!clipAndFlip && x != y)) {
+ out.collect(sourceToTarget);
+ out.collect(targetToSource);
+ }
+ } else {
+ out.collect(sourceToTarget);
+ }
+
+ edgesToGenerate--;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
new file mode 100644
index 0000000..a714a29
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.LongValueSequenceIterator;
+
+/**
+ * A singleton-edge {@link Graph} contains one or more isolated two-paths. The in- and out-degree
+ * of every vertex is 1. For {@code n} vertices there are {@code n/2} components.
+ */
+public class SingletonEdgeGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+ // Required to create the DataSource
+ private final ExecutionEnvironment env;
+
+ // Required configuration
+ private long vertexPairCount;
+
+ /**
+ * An undirected {@link Graph} containing isolated two-paths.
+ *
+ * @param env the Flink execution environment
+ * @param vertexPairCount number of pairs of vertices
+ */
+ public SingletonEdgeGraph(ExecutionEnvironment env, long vertexPairCount) {
+ if (vertexPairCount <= 0) {
+ throw new IllegalArgumentException("Vertex pair count must be greater than zero");
+ }
+
+ this.env = env;
+ this.vertexPairCount = vertexPairCount;
+ }
+
+ @Override
+ public Graph<LongValue,NullValue,NullValue> generate() {
+ // Vertices
+ long vertexCount = 2 * this.vertexPairCount;
+
+ DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+
+ // Edges
+ LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount - 1);
+
+ DataSet<Edge<LongValue,NullValue>> edges = env
+ .fromParallelCollection(iterator, LongValue.class)
+ .setParallelism(parallelism)
+ .name("Edge iterators")
+ .map(new LinkVertexToSingletonNeighbor())
+ .setParallelism(parallelism)
+ .name("Complete graph edges");
+
+ // Graph
+ return Graph.fromDataSet(vertices, edges, env);
+ }
+
+ @ForwardedFields("*->f0")
+ private class LinkVertexToSingletonNeighbor
+ implements MapFunction<LongValue, Edge<LongValue,NullValue>> {
+
+ private LongValue source = new LongValue();
+
+ private LongValue target = new LongValue();
+
+ private Edge<LongValue,NullValue> edge = new Edge<>(source, target, NullValue.getInstance());
+
+ @Override
+ public Edge<LongValue,NullValue> map(LongValue value) throws Exception {
+ long val = value.getValue();
+
+ source.setValue(val);
+
+ if (val % 2 == 0) {
+ target.setValue(val + 1);
+ } else {
+ target.setValue(val - 1);
+ }
+
+ return edge;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
new file mode 100644
index 0000000..cb99f30
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.LongValueSequenceIterator;
+
+/*
+ * @see <a href="http://mathworld.wolfram.com/StarGraph.html">Star Graph at Wolfram MathWorld</a>
+ */
+public class StarGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+ // Required to create the DataSource
+ private final ExecutionEnvironment env;
+
+ // Required configuration
+ private long vertexCount;
+
+ /**
+ * An undirected {@link Graph} containing a single central {@link Vertex} connected to all other leaf vertices.
+ *
+ * @param env the Flink execution environment
+ * @param vertexCount number of vertices
+ */
+ public StarGraph(ExecutionEnvironment env, long vertexCount) {
+ if (vertexCount <= 0) {
+ throw new IllegalArgumentException("Vertex count must be greater than zero");
+ }
+
+ this.env = env;
+ this.vertexCount = vertexCount;
+ }
+
+ @Override
+ public Graph<LongValue,NullValue,NullValue> generate() {
+ // Vertices
+ DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+
+ // Edges
+ LongValueSequenceIterator iterator = new LongValueSequenceIterator(1, this.vertexCount - 1);
+
+ DataSet<Edge<LongValue,NullValue>> edges = env
+ .fromParallelCollection(iterator, LongValue.class)
+ .setParallelism(parallelism)
+ .name("Edge iterators")
+ .flatMap(new LinkVertexToCenter())
+ .setParallelism(parallelism)
+ .name("Star graph edges");
+
+ // Graph
+ return Graph.fromDataSet(vertices, edges, env);
+ }
+
+ @ForwardedFields("*->f0")
+ public class LinkVertexToCenter
+ implements FlatMapFunction<LongValue, Edge<LongValue,NullValue>> {
+
+ private LongValue center = new LongValue(0);
+
+ private Edge<LongValue,NullValue> center_to_leaf = new Edge<>(center, null, NullValue.getInstance());
+
+ private Edge<LongValue,NullValue> leaf_to_center = new Edge<>(null, center, NullValue.getInstance());
+
+ @Override
+ public void flatMap(LongValue leaf, Collector<Edge<LongValue,NullValue>> out)
+ throws Exception {
+ center_to_leaf.f1 = leaf;
+ out.collect(center_to_leaf);
+
+ leaf_to_center.f0 = leaf;
+ out.collect(leaf_to_center);
+ }
+ }
+}