You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/05/11 16:43:08 UTC

[1/4] flink git commit: [hotfix] Refactor to use multi-catch

Repository: flink
Updated Branches:
  refs/heads/master fca8caea7 -> 704098725


[hotfix] Refactor to use multi-catch

This closes #3866


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70409872
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70409872
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70409872

Branch: refs/heads/master
Commit: 7040987258de276ca446a7bcf8b92a2e9437cb17
Parents: 3ee8c69
Author: Rodrigo Bonifacio <rb...@gmail.com>
Authored: Wed May 10 11:54:44 2017 -0300
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu May 11 12:42:25 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/io/AvroOutputFormat.java  | 4 +---
 .../flink/api/java/typeutils/runtime/PojoComparator.java     | 4 +---
 .../runtime/iterative/concurrent/BlockingBackChannel.java    | 4 +---
 .../flink/runtime/operators/hash/CompactingHashTable.java    | 8 ++------
 .../org/apache/flink/runtime/operators/util/TaskConfig.java  | 4 +---
 5 files changed, 6 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/70409872/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
index 1db45a5..ae90362 100644
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
@@ -125,9 +125,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
 			datumWriter = new SpecificDatumWriter<E>(avroValueType);
 			try {
 				schema = ((org.apache.avro.specific.SpecificRecordBase)avroValueType.newInstance()).getSchema();
-			} catch (InstantiationException e) {
-				throw new RuntimeException(e.getMessage());
-			} catch (IllegalAccessException e) {
+			} catch (InstantiationException | IllegalAccessException e) {
 				throw new RuntimeException(e.getMessage());
 			}
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/70409872/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
index 945abc8..a3f4280 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
@@ -131,9 +131,7 @@ public final class PojoComparator<T> extends CompositeTypeComparator<T> implemen
 		try {
 			this.serializer = (TypeSerializer<T>) InstantiationUtil.deserializeObject(
 					InstantiationUtil.serializeObject(toClone.serializer), Thread.currentThread().getContextClassLoader());
-		} catch (IOException e) {
-			throw new RuntimeException("Cannot copy serializer", e);
-		} catch (ClassNotFoundException e) {
+		} catch (IOException | ClassNotFoundException e) {
 			throw new RuntimeException("Cannot copy serializer", e);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/70409872/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
index 8631c99..067bbfe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
@@ -51,9 +51,7 @@ public class BlockingBackChannel {
 	public DataInputView getReadEndAfterSuperstepEnded() {
 		try {
 			return queue.take().switchBuffers();
-		} catch (InterruptedException e) {
-			throw new RuntimeException(e);
-		} catch (IOException e) {
+		} catch (InterruptedException | IOException e) {
 			throw new RuntimeException(e);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/70409872/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index eacb02c..67ca342 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -451,12 +451,8 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T> {
 						this.compactionMemory.allocateSegments((int)(newPointer >> this.pageSizeInBits));
 					}
 					return newPointer;
-				}
-				catch (EOFException ex) {
-					throw new RuntimeException("Memory ran out. Compaction failed. " +
-							getMemoryConsumptionString() + " Message: " + ex.getMessage());
-				}
-				catch (IndexOutOfBoundsException ex) {
+				} 
+				catch (EOFException | IndexOutOfBoundsException ex) {
 					throw new RuntimeException("Memory ran out. Compaction failed. " +
 							getMemoryConsumptionString() + " Message: " + ex.getMessage());
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/70409872/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
index 71c0405..a5decf4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
@@ -286,9 +286,7 @@ public class TaskConfig implements Serializable {
 	public <T> UserCodeWrapper<T> getStubWrapper(ClassLoader cl) {
 		try {
 			return (UserCodeWrapper<T>) InstantiationUtil.readObjectFromConfig(this.config, STUB_OBJECT, cl);
-		} catch (ClassNotFoundException e) {
-			throw new CorruptConfigurationException("Could not read the user code wrapper: " + e.getMessage(), e);
-		} catch (IOException e) {
+		} catch (ClassNotFoundException | IOException e) {
 			throw new CorruptConfigurationException("Could not read the user code wrapper: " + e.getMessage(), e);
 		}
 	}


[3/4] flink git commit: [FLINK-6393] [gelly] Add Circulant and Echo graph generators

Posted by gr...@apache.org.
[FLINK-6393] [gelly] Add Circulant and Echo graph generators

This closes #3802


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ee8c69a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ee8c69a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ee8c69a

Branch: refs/heads/master
Commit: 3ee8c69aa4390a8d51b33f262f719fb1a5474d51
Parents: 54c8826
Author: FlorianFan <fa...@163.com>
Authored: Thu Apr 27 20:41:53 2017 +0800
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu May 11 12:42:25 2017 -0400

----------------------------------------------------------------------
 docs/dev/libs/gelly/graph_generators.md         |  91 ++++++-
 .../java/org/apache/flink/graph/Runner.java     |  11 +-
 .../graph/drivers/input/CirculantGraph.java     | 129 ++++++++++
 .../flink/graph/drivers/input/EchoGraph.java    |  67 +++++
 .../flink/graph/drivers/input/GridGraph.java    |   3 +-
 .../flink/graph/drivers/EdgeListITCase.java     |  90 ++++++-
 .../flink/graph/generator/CirculantGraph.java   | 246 +++++++++++++++++++
 .../flink/graph/generator/CompleteGraph.java    |  57 +----
 .../apache/flink/graph/generator/EchoGraph.java |  82 +++++++
 .../apache/flink/graph/generator/GridGraph.java |   4 +-
 .../graph/generator/CirculantGraphTest.java     |  88 +++++++
 .../flink/graph/generator/EchoGraphTest.java    | 128 ++++++++++
 12 files changed, 926 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/docs/dev/libs/gelly/graph_generators.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/graph_generators.md b/docs/dev/libs/gelly/graph_generators.md
index 2532ee4..d4ad229 100644
--- a/docs/dev/libs/gelly/graph_generators.md
+++ b/docs/dev/libs/gelly/graph_generators.md
@@ -72,6 +72,42 @@ val graph = new GridGraph(env.getJavaEnv).addDimension(2, wrapEndpoints).addDime
 </div>
 </div>
 
+## Circulant Graph
+
+A [circulant graph](http://mathworld.wolfram.com/CirculantGraph.html) is an
+[oriented graph](http://mathworld.wolfram.com/OrientedGraph.html) configured
+with one or more contiguous ranges of offsets. Edges connect integer vertex IDs
+whose difference equals a configured offset. The circulant graph with no offsets
+is the [empty graph](#empty-graph) and the graph with the maximum range is the
+[complete graph](#complete-graph).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+long vertexCount = 5;
+
+Graph<LongValue, NullValue, NullValue> graph = new CirculantGraph(env, vertexCount)
+    .addRange(1, 2)
+    .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.CirculantGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexCount = 5
+
+val graph = new CirculantGraph(env.getJavaEnv, vertexCount).addRange(1, 2).generate()
+{% endhighlight %}
+</div>
+</div>
+
 ## Complete Graph
 
 An undirected graph connecting every distinct pair of vertices.
@@ -83,7 +119,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 long vertexCount = 5;
 
-Graph<LongValue,NullValue,NullValue> graph = new CompleteGraph(env, vertexCount)
+Graph<LongValue, NullValue, NullValue> graph = new CompleteGraph(env, vertexCount)
     .generate();
 {% endhighlight %}
 </div>
@@ -148,7 +184,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 long vertexCount = 5;
 
-Graph<LongValue,NullValue,NullValue> graph = new CycleGraph(env, vertexCount)
+Graph<LongValue, NullValue, NullValue> graph = new CycleGraph(env, vertexCount)
     .generate();
 {% endhighlight %}
 </div>
@@ -193,6 +229,41 @@ val graph = new CycleGraph(env.getJavaEnv, vertexCount).generate()
     <text x="51" y="199">4</text>
 </svg>
 
+## Echo Graph
+
+An [echo graph](http://mathworld.wolfram.com/EchoGraph.html) is a
+[circulant graph](#circulant-graph) with `n` vertices defined by the width of a
+single range of offsets centered at `n/2`. A vertex is connected to 'far'
+vertices, which connect to 'near' vertices, which connect to 'far' vertices, ....
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+long vertexCount = 5;
+long vertexDegree = 2;
+
+Graph<LongValue, NullValue, NullValue> graph = new EchoGraph(env, vertexCount, vertexDegree)
+    .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.EchoGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexCount = 5
+val vertexDegree = 2
+
+val graph = new EchoGraph(env.getJavaEnv, vertexCount, vertexDegree).generate()
+{% endhighlight %}
+</div>
+</div>
+
 ## Empty Graph
 
 A graph containing no edges.
@@ -204,7 +275,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 long vertexCount = 5;
 
-Graph<LongValue,NullValue,NullValue> graph = new EmptyGraph(env, vertexCount)
+Graph<LongValue, NullValue, NullValue> graph = new EmptyGraph(env, vertexCount)
     .generate();
 {% endhighlight %}
 </div>
@@ -257,7 +328,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 boolean wrapEndpoints = false;
 
-Graph<LongValue,NullValue,NullValue> graph = new GridGraph(env)
+Graph<LongValue, NullValue, NullValue> graph = new GridGraph(env)
     .addDimension(2, wrapEndpoints)
     .addDimension(4, wrapEndpoints)
     .generate();
@@ -327,7 +398,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 long dimensions = 3;
 
-Graph<LongValue,NullValue,NullValue> graph = new HypercubeGraph(env, dimensions)
+Graph<LongValue, NullValue, NullValue> graph = new HypercubeGraph(env, dimensions)
     .generate();
 {% endhighlight %}
 </div>
@@ -403,7 +474,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 long vertexCount = 5
 
-Graph<LongValue,NullValue,NullValue> graph = new PathGraph(env, vertexCount)
+Graph<LongValue, NullValue, NullValue> graph = new PathGraph(env, vertexCount)
     .generate();
 {% endhighlight %}
 </div>
@@ -464,7 +535,7 @@ RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory()
 int vertexCount = 1 << scale;
 int edgeCount = edgeFactor * vertexCount;
 
-Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
     .generate();
 {% endhighlight %}
 </div>
@@ -505,7 +576,7 @@ int edgeCount = edgeFactor * vertexCount;
 
 boolean clipAndFlip = false;
 
-Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
     .setConstants(0.57f, 0.19f, 0.19f)
     .setNoise(true, 0.10f)
     .generate();
@@ -542,7 +613,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 long vertexPairCount = 4
 
 // note: configured with the number of vertex pairs
-Graph<LongValue,NullValue,NullValue> graph = new SingletonEdgeGraph(env, vertexPairCount)
+Graph<LongValue, NullValue, NullValue> graph = new SingletonEdgeGraph(env, vertexPairCount)
     .generate();
 {% endhighlight %}
 </div>
@@ -607,7 +678,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 long vertexCount = 6;
 
-Graph<LongValue,NullValue,NullValue> graph = new StarGraph(env, vertexCount)
+Graph<LongValue, NullValue, NullValue> graph = new StarGraph(env, vertexCount)
     .generate();
 {% endhighlight %}
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
index 5ffe681..07cad1f 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
@@ -35,8 +35,10 @@ import org.apache.flink.graph.drivers.HITS;
 import org.apache.flink.graph.drivers.JaccardIndex;
 import org.apache.flink.graph.drivers.PageRank;
 import org.apache.flink.graph.drivers.TriangleListing;
+import org.apache.flink.graph.drivers.input.CirculantGraph;
 import org.apache.flink.graph.drivers.input.CompleteGraph;
 import org.apache.flink.graph.drivers.input.CycleGraph;
+import org.apache.flink.graph.drivers.input.EchoGraph;
 import org.apache.flink.graph.drivers.input.EmptyGraph;
 import org.apache.flink.graph.drivers.input.GridGraph;
 import org.apache.flink.graph.drivers.input.HypercubeGraph;
@@ -76,9 +78,11 @@ public class Runner {
 	private static final String OUTPUT = "output";
 
 	private static ParameterizedFactory<Input> inputFactory = new ParameterizedFactory<Input>()
+		.addClass(CirculantGraph.class)
 		.addClass(CompleteGraph.class)
 		.addClass(org.apache.flink.graph.drivers.input.CSV.class)
 		.addClass(CycleGraph.class)
+		.addClass(EchoGraph.class)
 		.addClass(EmptyGraph.class)
 		.addClass(GridGraph.class)
 		.addClass(HypercubeGraph.class)
@@ -236,7 +240,12 @@ public class Runner {
 
 		// Input
 
-		input.configure(parameters);
+		try {
+			input.configure(parameters);
+		} catch (RuntimeException ex) {
+			throw new ProgramParametrizationException(ex.getMessage());
+		}
+
 		Graph graph = input.create(env);
 
 		// Algorithm

http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
new file mode 100644
index 0000000..14ee816
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.generator.CirculantGraph.OffsetRange;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.CirculantGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.CirculantGraph}.
+ */
+public class CirculantGraph
+extends GeneratedGraph<LongValue> {
+
+	private static final String PREFIX = "range";
+
+	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
+		.setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
+
+	private List<OffsetRange> offsetRanges = new ArrayList<>();
+
+	@Override
+	public String getName() {
+		return CirculantGraph.class.getSimpleName();
+	}
+
+	@Override
+	public String getUsage() {
+		return "--" + PREFIX + "0 offset:length [--" + PREFIX + "1 offset:length [--" + PREFIX + "2 ...]]"
+			+ super.getUsage();
+	}
+
+	@Override
+	public void configure(ParameterTool parameterTool) throws ProgramParametrizationException {
+		super.configure(parameterTool);
+
+		// add offset ranges as ordered by offset ID (range0, range1, range2, ...)
+
+		Map<Integer, String> offsetRangeMap = new TreeMap<>();
+
+		// first parse all offset ranges into a sorted map
+		for (String key : parameterTool.toMap().keySet()) {
+			if (key.startsWith(PREFIX)) {
+				int offsetId = Integer.parseInt(key.substring(PREFIX.length()));
+				offsetRangeMap.put(offsetId, parameterTool.get(key));
+			}
+		}
+
+		// then store offset ranges in order
+		for (String field : offsetRangeMap.values()) {
+			ProgramParametrizationException exception = new ProgramParametrizationException("Circulant offset range" +
+				" must use a colon to separate the integer offset and integer length:" + field + "'");
+
+			if (! field.contains(":")) {
+				throw exception;
+			}
+
+			String[] parts = field.split(":");
+
+			if (parts.length != 2) {
+				throw exception;
+			}
+
+			try {
+				long offset = Long.parseLong(parts[0]);
+				long length = Long.parseLong(parts[1]);
+				offsetRanges.add(new OffsetRange(offset, length));
+			} catch (NumberFormatException ex) {
+				throw exception;
+			}
+		}
+	}
+
+	@Override
+	public String getIdentity() {
+		return getTypeName() + " " + getName() + " (" + offsetRanges + ")";
+	}
+
+	@Override
+	protected long vertexCount() {
+		return vertexCount.getValue();
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
+		org.apache.flink.graph.generator.CirculantGraph graph = new org.apache.flink.graph.generator.CirculantGraph(env,
+			vertexCount.getValue());
+
+		for (OffsetRange offsetRange : offsetRanges) {
+			graph.addRange(offsetRange.getOffset(), offsetRange.getLength());
+		}
+
+		return graph
+			.setParallelism(littleParallelism.getValue().intValue())
+			.generate();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
new file mode 100644
index 0000000..c9b0874
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.EchoGraph.MINIMUM_VERTEX_COUNT;
+import static org.apache.flink.graph.generator.EchoGraph.MINIMUM_VERTEX_DEGREE;
+
+/**
+ * Generate an {@link org.apache.flink.graph.generator.EchoGraph}.
+ */
+public class EchoGraph
+extends GeneratedGraph<LongValue> {
+
+	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
+		.setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+	private LongParameter vertexDegree = new LongParameter(this, "vertex_degree")
+		.setMinimumValue(MINIMUM_VERTEX_DEGREE);
+
+	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
+
+	@Override
+	public String getName() {
+		return EchoGraph.class.getSimpleName();
+	}
+
+	@Override
+	public String getIdentity() {
+		return getTypeName() + " " + getName() + " (" + vertexCount.getValue() + ":" + vertexDegree.getValue() + ")";
+	}
+
+	@Override
+	protected long vertexCount() {
+		return vertexCount.getValue();
+	}
+
+	@Override
+	protected Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) throws Exception {
+		return new org.apache.flink.graph.generator.EchoGraph(env, vertexCount.getValue(), vertexDegree.getValue())
+			.setParallelism(littleParallelism.getValue().intValue())
+			.generate();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
index b41b86e..2ce3c77 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
@@ -54,7 +54,8 @@ extends GeneratedGraph<LongValue> {
 
 	@Override
 	public String getUsage() {
-		return "--dim0 size:wrap_endpoints [--dim1 size:wrap_endpoints [--dim2 ...]]" + super.getUsage();
+		return "--" + PREFIX + "0 size:wrap_endpoints [--" + PREFIX + " size:wrap_endpoints [--" + PREFIX + " ...]]"
+			+ super.getUsage();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
index f566218..d3ba4fb 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
@@ -44,6 +44,50 @@ extends DriverBaseITCase {
 	}
 
 	@Test
+	public void testHashWithCirculantGraph() throws Exception {
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "nativeByte":
+			case "short":
+			case "nativeShort":
+			case "char":
+			case "nativeChar":
+			case "integer":
+			case "nativeInteger":
+			case "nativeLong":
+				checksum = 0x0000000000344448L;
+				break;
+
+			case "long":
+				checksum = 0x0000000000a19d48L;
+				break;
+
+			case "string":
+			case "nativeString":
+				checksum = 0x000000000c47ca48L;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedChecksum(
+			parameters("CirculantGraph", "hash", "--vertex_count", "42", "--range0", "13:4"),
+			168, checksum);
+	}
+
+	@Test
+	public void testPrintWithCirculantGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+		expectedOutputChecksum(
+			parameters("CirculantGraph", "print", "--vertex_count", "42", "--range0", "13:4"),
+			new Checksum(168, 0x0000004bdcc52cbcL));
+	}
+
+	@Test
 	public void testLongDescription() throws Exception {
 		String expected = regexSubstring(new EdgeList().getLongDescription());
 
@@ -142,10 +186,54 @@ extends DriverBaseITCase {
 	}
 
 	@Test
+	public void testHashWithEchoGraph() throws Exception {
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "nativeByte":
+			case "short":
+			case "nativeShort":
+			case "char":
+			case "nativeChar":
+			case "integer":
+			case "nativeInteger":
+			case "nativeLong":
+				checksum = 0x0000000000a9ddeaL;
+				break;
+
+			case "long":
+				checksum = 0x00000000020d3f2aL;
+				break;
+
+			case "string":
+			case "nativeString":
+				checksum = 0x0000000027e9516aL;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedChecksum(
+			parameters("EchoGraph", "hash", "--vertex_count", "42", "--vertex_degree", "13"),
+			546, checksum);
+	}
+
+	@Test
+	public void testPrintWithEchoGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+		expectedOutputChecksum(
+			parameters("EchoGraph", "print", "--vertex_count", "42", "--vertex_degree", "13"),
+			new Checksum(546, 0x000000f7190b8fcaL));
+	}
+
+	@Test
 	public void testHashWithEmptyGraph() throws Exception {
 		expectedChecksum(
 			parameters("EmptyGraph", "hash", "--vertex_count", "42"),
-			0, 0x0000000000000000);
+			0, 0x0000000000000000L);
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
new file mode 100644
index 0000000..9569b74
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.LongValueSequenceIterator;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @see <a href="http://mathworld.wolfram.com/CirculantGraph.html">Circulant Graph at Wolfram MathWorld</a>
+ */
+public class CirculantGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+	public static final int MINIMUM_VERTEX_COUNT = 2;
+
+	public static final int MINIMUM_OFFSET = 1;
+
+	// Required to create the DataSource
+	private final ExecutionEnvironment env;
+
+	// Required configuration
+	private long vertexCount;
+
+	private List<OffsetRange> offsetRanges = new ArrayList<>();
+
+	/**
+	 * An oriented {@link Graph} with {@code n} vertices where each vertex
+	 * v<sub>i</sub> is connected to vertex v<sub>(i+j)%n</sub> for each
+	 * configured offset {@code j}.
+	 *
+	 * @param env the Flink execution environment
+	 * @param vertexCount number of vertices
+	 */
+	public CirculantGraph(ExecutionEnvironment env, long vertexCount) {
+		Preconditions.checkArgument(vertexCount >= MINIMUM_VERTEX_COUNT,
+			"Vertex count must be at least " + MINIMUM_VERTEX_COUNT);
+
+		this.env = env;
+		this.vertexCount = vertexCount;
+	}
+
+	/**
+	 * Required configuration for each range of offsets in the graph.
+	 *
+	 * @param offset first offset appointing the vertices' position
+	 * @param length number of contiguous offsets in range
+	 * @return this
+	 */
+	public CirculantGraph addRange(long offset, long length) {
+		Preconditions.checkArgument(offset >= MINIMUM_OFFSET,
+			"Range offset must be at least " + MINIMUM_OFFSET);
+		Preconditions.checkArgument(length <= vertexCount - offset,
+			"Range length must not be greater than the vertex count minus the range offset.");
+
+		offsetRanges.add(new OffsetRange(offset, length));
+
+		return this;
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> generate() {
+		// Vertices
+		DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+
+		// Edges
+		LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount - 1);
+
+		// Validate ranges
+		Collections.sort(offsetRanges);
+		Iterator<OffsetRange> iter = offsetRanges.iterator();
+		OffsetRange lastRange = iter.next();
+
+		while (iter.hasNext()) {
+			OffsetRange nextRange = iter.next();
+
+			if (lastRange.overlaps(nextRange)) {
+				throw new IllegalArgumentException("Overlapping ranges " + lastRange + " and " + nextRange);
+			}
+
+			lastRange = nextRange;
+		}
+
+		DataSet<Edge<LongValue, NullValue>> edges = env
+			.fromParallelCollection(iterator, LongValue.class)
+				.setParallelism(parallelism)
+				.name("Edge iterators")
+			.flatMap(new LinkVertexToOffsets(vertexCount, offsetRanges))
+				.setParallelism(parallelism)
+				.name("Circulant graph edges");
+
+		// Graph
+		return Graph.fromDataSet(vertices, edges, env);
+	}
+
+	@FunctionAnnotation.ForwardedFields("*->f0")
+	private static class LinkVertexToOffsets
+	implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
+		private final long vertexCount;
+
+		private final List<OffsetRange> offsetRanges;
+
+		private LongValue target = new LongValue();
+
+		private Edge<LongValue, NullValue> edge = new Edge<>(null, target, NullValue.getInstance());
+
+		public LinkVertexToOffsets(long vertexCount, List<OffsetRange> offsetRanges) {
+			this.vertexCount = vertexCount;
+			this.offsetRanges = offsetRanges;
+		}
+
+		@Override
+		public void flatMap(LongValue source, Collector<Edge<LongValue, NullValue>> out)
+				throws Exception {
+			edge.f0 = source;
+			long sourceID = source.getValue();
+
+			for (OffsetRange offsetRange : offsetRanges) {
+				long targetID = sourceID + offsetRange.getOffset();
+
+				for (long i = offsetRange.getLength(); i > 0; i--) {
+					// add positive offset
+					target.setValue(targetID++ % vertexCount);
+					out.collect(edge);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Stores the start offset and length configuration for an offset range.
+	 */
+	public static class OffsetRange implements Serializable, Comparable<OffsetRange> {
+		private long offset;
+
+		private long length;
+
+		/**
+		 * Construct a range with the given offset and length.
+		 *
+		 * @param offset the range offset
+		 * @param length the range length
+		 */
+		public OffsetRange(long offset, long length) {
+			this.offset = offset;
+			this.length = length;
+		}
+
+		/**
+		 * Get the range offset
+		 *
+		 * @return the offset
+		 */
+		public long getOffset() {
+			return offset;
+		}
+
+		/**
+		 * Get the range length
+		 *
+		 * @return the length
+		 */
+		public long getLength() {
+			return length;
+		}
+
+		/**
+		 * Get the offset of the last index in the range
+		 *
+		 * @return last offset
+		 */
+		public long getLastOffset() {
+			return offset + length - 1;
+		}
+
+		/**
+		 * Return true if and only if the other range and this range share a
+		 * common offset ID.
+		 *
+		 * @param other other range
+		 * @return whether ranges are overlapping
+		 */
+		public boolean overlaps(OffsetRange other) {
+			boolean overlapping = false;
+
+			long lastOffset = getLastOffset();
+			long otherLastOffset = other.getLastOffset();
+
+			// check whether this range contains other
+			overlapping |= (offset <= other.offset && other.offset <= lastOffset);
+			overlapping |= (offset <= otherLastOffset && otherLastOffset <= lastOffset);
+
+			// check whether other contains this range
+			overlapping |= (other.offset <= offset && offset <= otherLastOffset);
+			overlapping |= (other.offset <= lastOffset && lastOffset <= otherLastOffset);
+
+			return overlapping;
+		}
+
+		@Override
+		public String toString() {
+			return Long.toString(offset) + ":" + Long.toString(length);
+		}
+
+		@Override
+		public int compareTo(OffsetRange o) {
+			int cmp = Long.compare(offset, o.offset);
+			if (cmp != 0) {
+				return cmp;
+			}
+
+			return Long.compare(length, o.length);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
index 11c0bb0..9dabe56 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
@@ -18,17 +18,10 @@
 
 package org.apache.flink.graph.generator;
 
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.LongValueSequenceIterator;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -61,53 +54,9 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate() {
-		// Vertices
-		DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
-
-		// Edges
-		LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount - 1);
-
-		DataSet<Edge<LongValue, NullValue>> edges = env
-			.fromParallelCollection(iterator, LongValue.class)
+		return new CirculantGraph(env, vertexCount)
+			.addRange(1, vertexCount - 1)
 				.setParallelism(parallelism)
-				.name("Edge iterators")
-			.flatMap(new LinkVertexToAll(vertexCount))
-				.setParallelism(parallelism)
-				.name("Complete graph edges");
-
-		// Graph
-		return Graph.fromDataSet(vertices, edges, env);
-	}
-
-	@ForwardedFields("*->f0")
-	private static class LinkVertexToAll
-	implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
-		private final long vertexCount;
-
-		private LongValue target = new LongValue();
-
-		private Edge<LongValue, NullValue> edge = new Edge<>(null, target, NullValue.getInstance());
-
-		public LinkVertexToAll(long vertexCount) {
-			this.vertexCount = vertexCount;
-		}
-
-		@Override
-		public void flatMap(LongValue source, Collector<Edge<LongValue, NullValue>> out)
-				throws Exception {
-			edge.f0 = source;
-
-			long s = source.getValue();
-			long t = (s + 1) % vertexCount;
-
-			while (s != t) {
-				target.setValue(t);
-				out.collect(edge);
-
-				if (++t == vertexCount) {
-					t = 0;
-				}
-			}
-		}
+				.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
new file mode 100644
index 0000000..c15cdca
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link CirculantGraph} with {@code n} vertices defined by the width of a
+ * single range of offsets centered at {@code n/2}. A vertex is connected to
+ * 'far' vertices, which connect to 'near' vertices, which connect to 'far'
+ * vertices, ....
+ * <p>
+ * Every {@link Vertex} in the {@link EchoGraph} has the same degree.
+ * and vertices as far as possible are chose to be linked.
+ * {@link EchoGraph} is a specific case of {@link CirculantGraph}.
+ */
+public class EchoGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+	public static final int MINIMUM_VERTEX_COUNT = 2;
+
+	public static final int MINIMUM_VERTEX_DEGREE = 1;
+
+	// Required to create the DataSource
+	private final ExecutionEnvironment env;
+
+	// Required configuration
+	private long vertexCount;
+
+	private long vertexDegree;
+
+	/**
+	 * An undirected {@link Graph} whose vertices have the same degree.
+	 *
+	 * @param env the Flink execution environment
+	 * @param vertexCount number of vertices
+	 * @param vertexDegree degree of vertices
+	 */
+	public EchoGraph(ExecutionEnvironment env, long vertexCount, long vertexDegree) {
+		Preconditions.checkArgument(vertexCount >= MINIMUM_VERTEX_COUNT,
+			"Vertex count must be at least " + MINIMUM_VERTEX_COUNT);
+		Preconditions.checkArgument(vertexDegree >= MINIMUM_VERTEX_DEGREE,
+			"Vertex degree must be at least " + MINIMUM_VERTEX_DEGREE);
+		Preconditions.checkArgument(vertexDegree < vertexCount,
+			"Vertex degree must be less than the vertex count.");
+		Preconditions.checkArgument(vertexCount % 2 == 0 ^ vertexDegree % 2 == 0,
+			"Vertex count or vertex degree must be an even number but not both.");
+
+		this.env = env;
+		this.vertexCount = vertexCount;
+		this.vertexDegree = vertexDegree;
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> generate() {
+		return new CirculantGraph(env, vertexCount)
+			.addRange((vertexCount - vertexDegree + 1) / 2, vertexDegree)
+			.setParallelism(parallelism)
+			.generate();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
index 23a6f98..0570dd2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
@@ -85,9 +85,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate() {
-		if (dimensions.isEmpty()) {
-			throw new RuntimeException("No dimensions added to GridGraph");
-		}
+		Preconditions.checkState(!dimensions.isEmpty(), "No dimensions added to GridGraph");
 
 		// Vertices
 		DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);

http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java
new file mode 100644
index 0000000..aae88ca
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+
+public class CirculantGraphTest
+extends AbstractGraphTest {
+
+	@Test
+	public void testGraph()
+			throws Exception {
+		Graph<LongValue, NullValue, NullValue> graph = new CirculantGraph(env, 10)
+			.addRange(4, 3)
+			.generate();
+
+		String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
+		String edges = "0,4; 0,5; 0,6; 1,5; 1,6; 1,7; 2,6;" +
+			"2,7; 2,8; 3,7; 3,8; 3,9; 4,0; 4,8; 4,9;" +
+			"5,0; 5,1; 5,9; 6,0; 6,1; 6,2; 7,1; 7,2; 7,3;" +
+			"8,2; 8,3; 8,4; 9,3; 9,4; 9,5";
+
+		TestUtils.compareGraph(graph, vertices, edges);
+	}
+
+	@Test
+	public void testGraphMetrics()
+			throws Exception {
+		int vertexCount = 10;
+		int offset = 4;
+		int length = 2;
+
+		Graph<LongValue, NullValue, NullValue> graph = new CirculantGraph(env, 10)
+			.addRange(offset, length)
+			.generate();
+
+		assertEquals(vertexCount, graph.numberOfVertices());
+		assertEquals(vertexCount * length, graph.numberOfEdges());
+
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
+
+		assertEquals(length, maxInDegree);
+		assertEquals(length, maxOutDegree);
+	}
+
+	@Test
+	public void testParallelism()
+			throws Exception {
+		int parallelism = 2;
+
+		Graph<LongValue, NullValue, NullValue> graph = new CirculantGraph(env, 10)
+			.addRange(4, 2)
+			.setParallelism(parallelism)
+			.generate();
+
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue, NullValue>>());
+
+		TestUtils.verifyParallelism(env, parallelism);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ee8c69a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java
new file mode 100644
index 0000000..777b576
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+
+public class EchoGraphTest
+extends AbstractGraphTest {
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	@Test
+	public void testGraphWithEvenVertexCountWithOddVertexDegree()
+			throws Exception {
+		Graph<LongValue, NullValue, NullValue> graph = new EchoGraph(env, 10, 3)
+			.generate();
+
+		String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
+		String edges = "0,4; 0,5; 0,6; 1,5; 1,6; 1,7; 2,6;" +
+			"2,7; 2,8; 3,7; 3,8; 3,9; 4,0; 4,8; 4,9;" +
+			"5,0; 5,1; 5,9; 6,0; 6,1; 6,2; 7,1; 7,2; 7,3;" +
+			"8,2; 8,3; 8,4; 9,3; 9,4; 9,5";
+
+		TestUtils.compareGraph(graph, vertices, edges);
+	}
+
+	@Test
+	public void testGraphWithOddVertexCountWithEvenVertexDegree()
+			throws Exception {
+		Graph<LongValue, NullValue, NullValue> graph = new EchoGraph(env, 9, 2)
+			.generate();
+
+		String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8";
+		String edges = "0,4; 0,5; 1,5; 1,6; 2,6; 2,7;" +
+			"3,7; 3,8; 4,8; 4,0; 5,0; 5,1;" +
+			"6,1; 6,2; 7,2; 7,3; 8,3; 8,4";
+
+		TestUtils.compareGraph(graph, vertices, edges);
+	}
+
+	@Test
+	public void testGraphWithOddVertexCountWithOddVertexDegree()
+			throws Exception {
+		thrown.expect(IllegalArgumentException.class);
+		thrown.expectMessage("Vertex count or vertex degree must be an even number but not both.");
+
+		new EchoGraph(env, 5, 3).generate();
+	}
+
+	@Test
+	public void testGraphWithEvenVertexCountWithEvenVertexDegree()
+			throws Exception {
+		thrown.expect(IllegalArgumentException.class);
+		thrown.expectMessage("Vertex count or vertex degree must be an even number but not both.");
+
+		new EchoGraph(env, 6, 2).generate();
+	}
+
+	@Test
+	public void testGraphWithVertexDegreeTooLarge()
+			throws Exception {
+		thrown.expect(IllegalArgumentException.class);
+		thrown.expectMessage("Vertex degree must be less than the vertex count.");
+
+		new EchoGraph(env, 8, 8).generate();
+	}
+
+	@Test
+	public void testGraphMetrics()
+			throws Exception {
+		int vertexCount = 10;
+		int vertexDegree = 3;
+
+		Graph<LongValue, NullValue, NullValue> graph = new EchoGraph(env, vertexCount, vertexDegree)
+			.generate();
+
+		assertEquals(vertexCount, graph.numberOfVertices());
+		assertEquals(vertexCount * vertexDegree, graph.numberOfEdges());
+
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
+
+		assertEquals(vertexDegree, maxInDegree);
+		assertEquals(vertexDegree, maxOutDegree);
+	}
+
+	@Test
+	public void testParallelism()
+			throws Exception {
+		int parallelism = 2;
+
+		Graph<LongValue, NullValue, NullValue> graph = new EchoGraph(env, 10, 3)
+			.setParallelism(parallelism)
+			.generate();
+
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue, NullValue>>());
+
+		TestUtils.verifyParallelism(env, parallelism);
+	}
+}


[2/4] flink git commit: [hotfix] [gelly] Improve graph generator documentation

Posted by gr...@apache.org.
[hotfix] [gelly] Improve graph generator documentation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54c88263
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54c88263
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54c88263

Branch: refs/heads/master
Commit: 54c88263963f6a379066464842734b25ac271ba6
Parents: e43dd60
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon May 8 12:45:19 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu May 11 12:42:25 2017 -0400

----------------------------------------------------------------------
 docs/dev/libs/gelly/graph_generators.md           | 18 +++++++++++-------
 .../flink/graph/generator/CompleteGraph.java      |  2 +-
 .../apache/flink/graph/generator/CycleGraph.java  |  6 ++++--
 .../apache/flink/graph/generator/EmptyGraph.java  |  2 +-
 .../apache/flink/graph/generator/GridGraph.java   |  5 +++--
 .../flink/graph/generator/HypercubeGraph.java     |  6 ++++--
 .../apache/flink/graph/generator/PathGraph.java   |  9 +++++++--
 .../apache/flink/graph/generator/RMatGraph.java   |  6 +++---
 .../flink/graph/generator/SingletonEdgeGraph.java |  7 ++++---
 .../apache/flink/graph/generator/StarGraph.java   |  6 ++++--
 10 files changed, 42 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/54c88263/docs/dev/libs/gelly/graph_generators.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/graph_generators.md b/docs/dev/libs/gelly/graph_generators.md
index 5598d83..2532ee4 100644
--- a/docs/dev/libs/gelly/graph_generators.md
+++ b/docs/dev/libs/gelly/graph_generators.md
@@ -48,7 +48,7 @@ boolean wrapEndpoints = false;
 
 int parallelism = 4;
 
-Graph<LongValue,NullValue,NullValue> graph = new GridGraph(env)
+Graph<LongValue, NullValue, NullValue> graph = new GridGraph(env)
     .addDimension(2, wrapEndpoints)
     .addDimension(4, wrapEndpoints)
     .setParallelism(parallelism)
@@ -138,7 +138,8 @@ val graph = new CompleteGraph(env.getJavaEnv, vertexCount).generate()
 
 ## Cycle Graph
 
-An undirected graph where all edges form a single cycle.
+An undirected graph where the set of edges form a single cycle by connecting
+each vertex to two adjacent vertices in a chained loop.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -194,7 +195,7 @@ val graph = new CycleGraph(env.getJavaEnv, vertexCount).generate()
 
 ## Empty Graph
 
-The graph containing no edges.
+A graph containing no edges.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -316,7 +317,7 @@ val graph = new GridGraph(env.getJavaEnv).addDimension(2, wrapEndpoints).addDime
 
 ## Hypercube Graph
 
-An undirected graph where edges form an n-dimensional hypercube. Each vertex
+An undirected graph where edges form an `n`-dimensional hypercube. Each vertex
 in a hypercube connects to one other vertex in each dimension.
 
 <div class="codetabs" markdown="1">
@@ -391,7 +392,9 @@ val graph = new HypercubeGraph(env.getJavaEnv, dimensions).generate()
 
 ## Path Graph
 
-An undirected Graph where all edges form a single path.
+An undirected graph where the set of edges form a single path by connecting
+two `endpoint` vertices with degree `1` and all midpoint vertices with degree
+`2`. A path graph can be formed by removing a single edge from a cycle graph.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -443,7 +446,7 @@ val graph = new PathGraph(env.getJavaEnv, vertexCount).generate()
 
 ## RMat Graph
 
-A directed or undirected power-law graph generated using the
+A directed power-law multigraph generated using the
 [Recursive Matrix (R-Mat)](http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf) model.
 
 RMat is a stochastic generator configured with a source of randomness implementing the
@@ -528,7 +531,8 @@ val graph = new RMatGraph(env.getJavaEnv, rnd, vertexCount, edgeCount).setConsta
 
 ## Singleton Edge Graph
 
-An undirected graph containing isolated two-paths.
+An undirected graph containing isolated two-paths where every vertex has degree
+`1`.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">

http://git-wip-us.apache.org/repos/asf/flink/blob/54c88263/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
index 84cb791..11c0bb0 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
@@ -31,7 +31,7 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.LongValueSequenceIterator;
 import org.apache.flink.util.Preconditions;
 
-/*
+/**
  * @see <a href="http://mathworld.wolfram.com/CompleteGraph.html">Complete Graph at Wolfram MathWorld</a>
  */
 public class CompleteGraph

http://git-wip-us.apache.org/repos/asf/flink/blob/54c88263/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
index b04d78c..2386fe8 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
@@ -24,7 +24,7 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Preconditions;
 
-/*
+/**
  * @see <a href="http://mathworld.wolfram.com/CycleGraph.html">Cycle Graph at Wolfram MathWorld</a>
  */
 public class CycleGraph
@@ -39,7 +39,9 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	private long vertexCount;
 
 	/**
-	 * An undirected {@link Graph} where all edges form a single cycle.
+	 * An undirected {@link Graph} with {@code n} vertices where each vertex
+	 * v<sub>i</sub> is connected to adjacent vertices v<sub>(i+1)%n</sub> and
+	 * v<sub>(i-1)%n</sub>.
 	 *
 	 * @param env the Flink execution environment
 	 * @param vertexCount number of vertices

http://git-wip-us.apache.org/repos/asf/flink/blob/54c88263/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
index 25584ea..23e3a9c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
@@ -33,7 +33,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Collections;
 
-/*
+/**
  * @see <a href="http://mathworld.wolfram.com/EmptyGraph.html">Empty Graph at Wolfram MathWorld</a>
  */
 public class EmptyGraph

http://git-wip-us.apache.org/repos/asf/flink/blob/54c88263/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
index 16a9ab9..23a6f98 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
@@ -35,7 +35,7 @@ import org.apache.flink.util.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 
-/*
+/**
  * @see <a href="http://mathworld.wolfram.com/GridGraph.html">Grid Graph at Wolfram MathWorld</a>
  */
 public class GridGraph
@@ -50,7 +50,8 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	private long vertexCount = 1;
 
 	/**
-	 * An undirected {@link Graph} connecting vertices in a regular tiling in one or more dimensions.
+	 * An undirected {@code Graph} connecting vertices in a regular tiling in
+	 * one or more dimensions and where the endpoints are optionally connected.
 	 *
 	 * @param env the Flink execution environment
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/54c88263/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
index 37590ff..e4eac69 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
@@ -24,7 +24,7 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Preconditions;
 
-/*
+/**
  * @see <a href="http://mathworld.wolfram.com/HypercubeGraph.html">Hypercube Graph at Wolfram MathWorld</a>
  */
 public class HypercubeGraph
@@ -39,7 +39,9 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	private long dimensions;
 
 	/**
-	 * An undirected {@link Graph} where edges form an n-dimensional hypercube.
+	 * An undirected {@code Graph} where edges form an n-dimensional hypercube.
+	 * Each vertex in a hypercube connects to one other vertex in each
+	 * dimension.
 	 *
 	 * @param env the Flink execution environment
 	 * @param dimensions number of dimensions

http://git-wip-us.apache.org/repos/asf/flink/blob/54c88263/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
index dcc4c98..5c4343b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
@@ -24,7 +24,7 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Preconditions;
 
-/*
+/**
  * @see <a href="http://mathworld.wolfram.com/PathGraph.html">Path Graph at Wolfram MathWorld</a>
  */
 public class PathGraph
@@ -39,7 +39,12 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	private long vertexCount;
 
 	/**
-	 * An undirected {@link Graph} where all edges form a single path.
+	 * An undirected {@link Graph} with {@code n} vertices where each vertex
+	 * v<sub>i</sub> connects to adjacent vertices v<sub>i+1</sub> when
+	 * {@code i < n-1} and v<sub>i-1</sub> when {@code i > 0}.
+	 * <p>
+	 * A {@code PathGraph} is distinguished from a {@code CycleGraph} in that
+	 * the first and last vertex are not connected, breaking the cycle.
 	 *
 	 * @param env the Flink execution environment
 	 * @param vertexCount number of vertices

http://git-wip-us.apache.org/repos/asf/flink/blob/54c88263/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
index 0fa4127..071b415 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
@@ -34,7 +34,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.List;
 
-/*
+/**
  * @see <a href="http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf">R-MAT: A Recursive Model for Graph Mining</a>
  */
 public class RMatGraph<T extends RandomGenerator>
@@ -75,8 +75,8 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	public float noise = DEFAULT_NOISE;
 
 	/**
-	 * Generate a directed or undirected power-law {@link Graph} using the
-	 * Recursive Matrix (R-Mat) model.
+	 * A directed power-law multi{@link Graph graph} generated using the
+	 * stochastic Recursive Matrix (R-Mat) model.
 	 *
 	 * @param env the Flink execution environment
 	 * @param randomGeneratorFactory source of randomness

http://git-wip-us.apache.org/repos/asf/flink/blob/54c88263/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
index f3c087e..125501c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
@@ -31,8 +31,7 @@ import org.apache.flink.util.LongValueSequenceIterator;
 import org.apache.flink.util.Preconditions;
 
 /**
- * A singleton-edge {@link Graph} contains one or more isolated two-paths. The in- and out-degree
- * of every vertex is 1. For {@code n} vertices there are {@code n/2} components.
+ * A singleton-edge {@link Graph} contains one or more isolated two-paths.
  */
 public class SingletonEdgeGraph
 extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
@@ -46,7 +45,9 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	private long vertexPairCount;
 
 	/**
-	 * An undirected {@link Graph} containing isolated two-paths.
+	 * An undirected {@link Graph} containing one or more isolated two-paths.
+	 * The in- and out-degree of every vertex is 1. For {@code n} vertices
+	 * there are {@code n/2} components.
 	 *
 	 * @param env the Flink execution environment
 	 * @param vertexPairCount number of pairs of vertices

http://git-wip-us.apache.org/repos/asf/flink/blob/54c88263/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
index 316d749..f0f4e5a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
@@ -31,7 +31,7 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.LongValueSequenceIterator;
 import org.apache.flink.util.Preconditions;
 
-/*
+/**
  * @see <a href="http://mathworld.wolfram.com/StarGraph.html">Star Graph at Wolfram MathWorld</a>
  */
 public class StarGraph
@@ -46,7 +46,9 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	private long vertexCount;
 
 	/**
-	 * An undirected {@link Graph} containing a single central {@link Vertex} connected to all other leaf vertices.
+	 * An undirected {@Graph} with {@code n} vertices where the single central
+	 * node has degree {@code n-1}, connecting to the other {@code n-1}
+	 * vertices which have degree {@code 1}.
 	 *
 	 * @param env the Flink execution environment
 	 * @param vertexCount number of vertices


[4/4] flink git commit: [hotfix] [docs] Update file path of Gelly examples

Posted by gr...@apache.org.
[hotfix] [docs] Update file path of Gelly examples


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e43dd60b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e43dd60b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e43dd60b

Branch: refs/heads/master
Commit: e43dd60b58849295dca715c13486c7cbd08da151
Parents: fca8cae
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu May 4 13:37:15 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu May 11 12:42:25 2017 -0400

----------------------------------------------------------------------
 docs/dev/libs/gelly/index.md | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e43dd60b/docs/dev/libs/gelly/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/index.md b/docs/dev/libs/gelly/index.md
index 193ba86..7ae7968 100644
--- a/docs/dev/libs/gelly/index.md
+++ b/docs/dev/libs/gelly/index.md
@@ -86,7 +86,7 @@ After configuring and starting the cluster, list the available algorithm classes
 
 ~~~bash
 ./bin/start-cluster.sh
-./bin/flink run examples/flink-gelly-examples_*.jar
+./bin/flink run examples/gelly/flink-gelly-examples_*.jar
 ~~~
 
 The Gelly drivers can generate graph data or read the edge list from a CSV file (each node in a cluster must have access
@@ -94,13 +94,13 @@ to the input file). The algorithm description, available inputs and outputs, and
 algorithm is selected. Print usage for [JaccardIndex](./library_methods.html#jaccard-index):
 
 ~~~bash
-./bin/flink run examples/flink-gelly-examples_*.jar --algorithm JaccardIndex
+./bin/flink run examples/gelly/flink-gelly-examples_*.jar --algorithm JaccardIndex
 ~~~
 
 Display [graph metrics](./library_methods.html#metric) for a million vertex graph:
 
 ~~~bash
-./bin/flink run examples/flink-gelly-examples_*.jar \
+./bin/flink run examples/gelly/flink-gelly-examples_*.jar \
     --algorithm GraphMetrics --order directed \
     --input RMatGraph --type integer --scale 20 --simplify directed \
     --output print
@@ -117,17 +117,17 @@ Run a few algorithms and monitor the job progress in Flink's Web UI:
 ~~~bash
 wget -O - http://snap.stanford.edu/data/bigdata/communities/com-lj.ungraph.txt.gz | gunzip -c > com-lj.ungraph.txt
 
-./bin/flink run -q examples/flink-gelly-examples_*.jar \
+./bin/flink run -q examples/gelly/flink-gelly-examples_*.jar \
     --algorithm GraphMetrics --order undirected \
     --input CSV --type integer --simplify undirected --input_filename com-lj.ungraph.txt --input_field_delimiter $'\t' \
     --output print
 
-./bin/flink run -q examples/flink-gelly-examples_*.jar \
+./bin/flink run -q examples/gelly/flink-gelly-examples_*.jar \
     --algorithm ClusteringCoefficient --order undirected \
     --input CSV --type integer --simplify undirected --input_filename com-lj.ungraph.txt --input_field_delimiter $'\t' \
     --output hash
 
-./bin/flink run -q examples/flink-gelly-examples_*.jar \
+./bin/flink run -q examples/gelly/flink-gelly-examples_*.jar \
     --algorithm JaccardIndex \
     --input CSV --type integer --simplify undirected --input_filename com-lj.ungraph.txt --input_field_delimiter $'\t' \
     --output hash