You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/03/31 16:35:27 UTC

[4/6] flink git commit: [FLINK-5912] [gelly] Inputs for CSV and graph generators

[FLINK-5912] [gelly] Inputs for CSV and graph generators

Create Input classes for reading graphs from CSV as well as for each of
the graph generators.

This closes #3626


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ded25be4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ded25be4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ded25be4

Branch: refs/heads/master
Commit: ded25be4d8fc51f2a58cbd3264daffe48dca6b04
Parents: 963f46e
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Mar 27 10:08:59 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Mar 31 11:16:05 2017 -0400

----------------------------------------------------------------------
 .../apache/flink/graph/drivers/input/CSV.java   | 115 ++++++++++++
 .../graph/drivers/input/CompleteGraph.java      |  60 +++++++
 .../flink/graph/drivers/input/CycleGraph.java   |  60 +++++++
 .../flink/graph/drivers/input/EmptyGraph.java   |  55 ++++++
 .../flink/graph/drivers/input/GridGraph.java    | 144 +++++++++++++++
 .../graph/drivers/input/HypercubeGraph.java     |  60 +++++++
 .../flink/graph/drivers/input/PathGraph.java    |  60 +++++++
 .../flink/graph/drivers/input/RMatGraph.java    | 174 +++++++++++++++++++
 .../graph/drivers/input/SingletonEdgeGraph.java |  60 +++++++
 .../flink/graph/drivers/input/StarGraph.java    |  60 +++++++
 .../graph/drivers/parameter/Parameterized.java  |   2 +-
 .../drivers/parameter/ParameterizedBase.java    |   2 +-
 .../flink/graph/drivers/parameter/Simplify.java | 137 +++++++++++++++
 .../graph/drivers/parameter/SimplifyTest.java   |  56 ++++++
 14 files changed, 1043 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
new file mode 100644
index 0000000..58b65b6
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphCsvReader;
+import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.graph.drivers.parameter.Simplify;
+import org.apache.flink.graph.drivers.parameter.StringParameter;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+
+/**
+ * Read a {@link Graph} from a CSV file using {@link IntValue},
+ * {@link LongValue}, or {@link StringValue} keys.
+ *
+ * @param <K> key type
+ */
+public class CSV<K extends Comparable<K>>
+extends ParameterizedBase
+implements Input<K, NullValue, NullValue> {
+
+	private static final String INTEGER = "integer";
+
+	private static final String LONG = "long";
+
+	private static final String STRING = "string";
+
+	private ChoiceParameter type = new ChoiceParameter(this, "type")
+		.setDefaultValue(INTEGER)
+		.addChoices(LONG, STRING);
+
+	private StringParameter inputFilename = new StringParameter(this, "input_filename");
+
+	private StringParameter commentPrefix = new StringParameter(this, "comment_prefix")
+		.setDefaultValue("#");
+
+	private StringParameter lineDelimiter = new StringParameter(this, "input_line_delimiter")
+		.setDefaultValue(CsvInputFormat.DEFAULT_LINE_DELIMITER);
+
+	private StringParameter fieldDelimiter = new StringParameter(this, "input_field_delimiter")
+		.setDefaultValue(CsvInputFormat.DEFAULT_FIELD_DELIMITER);
+
+	private Simplify simplify = new Simplify(this);
+
+	@Override
+	public String getName() {
+		return CSV.class.getSimpleName();
+	}
+
+	@Override
+	public String getIdentity() {
+		return WordUtils.capitalize(getName()) + WordUtils.capitalize(type.getValue()) + " (" + inputFilename + ")";
+	}
+
+	/**
+	 * Generate the graph as configured.
+	 *
+	 * @param env execution environment
+	 * @return input graph
+	 */
+	public Graph<K, NullValue, NullValue> create(ExecutionEnvironment env) throws Exception {
+		GraphCsvReader reader = Graph.fromCsvReader(inputFilename.getValue(), env)
+			.ignoreCommentsEdges(commentPrefix.getValue())
+			.lineDelimiterEdges(lineDelimiter.getValue())
+			.fieldDelimiterEdges(fieldDelimiter.getValue());
+
+		Graph<K, NullValue, NullValue> graph;
+
+		switch (type.getValue()) {
+			case INTEGER:
+				graph = (Graph<K, NullValue, NullValue>) reader
+					.keyType(IntValue.class);
+				break;
+
+			case LONG:
+				graph = (Graph<K, NullValue, NullValue>) reader
+					.keyType(LongValue.class);
+				break;
+
+			case STRING:
+				graph = (Graph<K, NullValue, NullValue>) reader
+					.keyType(StringValue.class);
+				break;
+
+			default:
+				throw new ProgramParametrizationException("Unknown type '" + type.getValue() + "'");
+		}
+
+		return simplify.simplify(graph);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
new file mode 100644
index 0000000..c31c5a8
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.CompleteGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.CompleteGraph}.
+ */
+public class CompleteGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
+		.setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
+
+	@Override
+	public String getName() {
+		return CompleteGraph.class.getSimpleName();
+	}
+
+	@Override
+	public String getIdentity() {
+		return getName() + " (" + vertexCount.getValue() + ")";
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+		return new org.apache.flink.graph.generator.CompleteGraph(env, vertexCount.getValue())
+			.setParallelism(littleParallelism.getValue().intValue())
+			.generate();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
new file mode 100644
index 0000000..df66dab
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.CycleGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.CycleGraph}.
+ */
+public class CycleGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
+		.setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
+
+	@Override
+	public String getName() {
+		return CycleGraph.class.getSimpleName();
+	}
+
+	@Override
+	public String getIdentity() {
+		return getName() + " (" + vertexCount + ")";
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+		return new org.apache.flink.graph.generator.CycleGraph(env, vertexCount.getValue())
+			.setParallelism(littleParallelism.getValue().intValue())
+			.generate();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
new file mode 100644
index 0000000..794317b
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.graph.generator.EmptyGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate an {@link org.apache.flink.graph.generator.EmptyGraph}.
+ */
+public class EmptyGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
+		.setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+	@Override
+	public String getName() {
+		return EmptyGraph.class.getSimpleName();
+	}
+
+	@Override
+	public String getIdentity() {
+		return getName() + " (" + vertexCount + ")";
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+		return new org.apache.flink.graph.generator.EmptyGraph(env, vertexCount.getValue())
+			.generate();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
new file mode 100644
index 0000000..d502215
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.GridGraph}.
+ */
+public class GridGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+	private static final String PREFIX = "dim";
+
+	private List<Dimension> dimensions = new ArrayList<>();
+
+	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
+
+	@Override
+	public String getName() {
+		return GridGraph.class.getSimpleName();
+	}
+
+	@Override
+	public String getUsage() {
+		return "--dim0 size:wrap_endpoints [--dim1 size:wrap_endpoints [--dim2 ...]]" + super.getUsage();
+	}
+
+	@Override
+	public void configure(ParameterTool parameterTool) throws ProgramParametrizationException {
+		super.configure(parameterTool);
+
+		// add dimensions as ordered by dimension ID (dim0, dim1, dim2, ...)
+
+		Map<Integer, String> dimensionMap = new TreeMap<>();
+
+		// first parse all dimensions into a sorted map
+		for (Map.Entry<String, String> entry : parameterTool.toMap().entrySet()) {
+			if (entry.getKey().startsWith(PREFIX)) {
+				int dimensionId = Integer.parseInt(entry.getKey().substring(PREFIX.length()));
+				dimensionMap.put(dimensionId, entry.getValue());
+			}
+		}
+
+		// then store dimensions in order
+		for (String field : dimensionMap.values()) {
+			dimensions.add(new Dimension(field));
+		}
+	}
+
+	@Override
+	public String getIdentity() {
+		return getName() + " (" + dimensions + ")";
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+		org.apache.flink.graph.generator.GridGraph graph = new org.apache.flink.graph.generator.GridGraph(env);
+
+		for (Dimension dimension : dimensions) {
+			graph.addDimension(dimension.size, dimension.wrapEndpoints);
+		}
+
+		return graph
+			.setParallelism(littleParallelism.getValue().intValue())
+			.generate();
+	}
+
+	/**
+	 * Stores and parses the size and endpoint wrapping configuration for a
+	 * {@link org.apache.flink.graph.generator.GridGraph} dimension.
+	 */
+	private static class Dimension {
+		private long size;
+
+		private boolean wrapEndpoints;
+
+		/**
+		 * Configuration string to be parsed. The size integer and endpoint
+		 * wrapping boolean must be separated by a colon.
+		 *
+		 * @param field configuration string
+		 */
+		public Dimension(String field) {
+			ProgramParametrizationException exception = new ProgramParametrizationException("Grid dimension must use " +
+				"a colon to separate the integer size and boolean indicating whether the dimension endpoints are " +
+				"connected: '" + field + "'");
+
+			if (! field.contains(":")) {
+				throw exception;
+			}
+
+			String[] parts = field.split(":");
+
+			if (parts.length != 2) {
+				throw exception;
+			}
+
+			try {
+				size = Long.parseLong(parts[0]);
+				wrapEndpoints = Boolean.parseBoolean(parts[1]);
+			} catch(NumberFormatException ex) {
+				throw exception;
+			}
+		}
+
+		@Override
+		public String toString() {
+			return Long.toString(size) + (wrapEndpoints ? "+" : "\u229e");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
new file mode 100644
index 0000000..3b87b00
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.HypercubeGraph.MINIMUM_DIMENSIONS;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.HypercubeGraph}.
+ */
+public class HypercubeGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+	private LongParameter dimensions = new LongParameter(this, "dimensions")
+		.setMinimumValue(MINIMUM_DIMENSIONS);
+
+	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
+
+	@Override
+	public String getName() {
+		return HypercubeGraph.class.getSimpleName();
+	}
+
+	@Override
+	public String getIdentity() {
+		return getName() + " (" + dimensions + ")";
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+		return new org.apache.flink.graph.generator.HypercubeGraph(env, dimensions.getValue())
+			.setParallelism(littleParallelism.getValue().intValue())
+			.generate();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
new file mode 100644
index 0000000..968628c
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.PathGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.PathGraph}.
+ */
+public class PathGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
+		.setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
+
+	@Override
+	public String getName() {
+		return PathGraph.class.getSimpleName();
+	}
+
+	@Override
+	public String getIdentity() {
+		return getName() + " (" + vertexCount + ")";
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+		return new org.apache.flink.graph.generator.PathGraph(env, vertexCount.getValue())
+			.setParallelism(littleParallelism.getValue().intValue())
+			.generate();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
new file mode 100644
index 0000000..e4e6a4c
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.asm.translate.translators.LongValueToStringValue;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.drivers.parameter.BooleanParameter;
+import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
+import org.apache.flink.graph.drivers.parameter.DoubleParameter;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.graph.drivers.parameter.Simplify;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Generate an {@code RMatGraph} with {@link IntValue}, {@link LongValue},
+ * or {@link StringValue} keys.
+ *
+ * @see org.apache.flink.graph.generator.RMatGraph
+ *
+ * @param <K> key type
+ */
+public class RMatGraph<K extends Comparable<K>>
+extends ParameterizedBase
+implements Input<K, NullValue, NullValue> {
+
+	private static final String INTEGER = "integer";
+
+	private static final String LONG = "long";
+
+	private static final String STRING = "string";
+
+	private ChoiceParameter type = new ChoiceParameter(this, "type")
+		.setDefaultValue(INTEGER)
+		.addChoices(LONG, STRING);
+
+	// generate graph with 2^scale vertices
+	private LongParameter scale = new LongParameter(this, "scale")
+		.setDefaultValue(10)
+		.setMinimumValue(1);
+
+	// generate graph with edgeFactor * 2^scale edges
+	private LongParameter edgeFactor = new LongParameter(this, "edge_factor")
+		.setDefaultValue(16)
+		.setMinimumValue(1);
+
+	// matrix parameters "a", "b", "c", and implicitly "d = 1 - a - b - c"
+	// describe the skew in the recursive matrix
+	private DoubleParameter a = new DoubleParameter(this, "a")
+		.setDefaultValue(org.apache.flink.graph.generator.RMatGraph.DEFAULT_A)
+		.setMinimumValue(0.0, false);
+
+	private DoubleParameter b = new DoubleParameter(this, "b")
+		.setDefaultValue(org.apache.flink.graph.generator.RMatGraph.DEFAULT_B)
+		.setMinimumValue(0.0, false);
+
+	private DoubleParameter c = new DoubleParameter(this, "c")
+		.setDefaultValue(org.apache.flink.graph.generator.RMatGraph.DEFAULT_C)
+		.setMinimumValue(0.0, false);
+
+	// noise randomly pertubates the matrix parameters for each successive bit
+	// for each generated edge
+	private BooleanParameter noiseEnabled = new BooleanParameter(this, "noise_enabled");
+
+	private DoubleParameter noise = new DoubleParameter(this, "noise")
+		.setDefaultValue(org.apache.flink.graph.generator.RMatGraph.DEFAULT_NOISE)
+		.setMinimumValue(0.0, true)
+		.setMaximumValue(2.0, true);
+
+	private Simplify simplify = new Simplify(this);
+
+	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
+
+	@Override
+	public String getName() {
+		return RMatGraph.class.getSimpleName();
+	}
+
+	@Override
+	public String getIdentity() {
+		return getName() + WordUtils.capitalize(type.getValue()) +
+			" (s" + scale + "e" + edgeFactor + simplify.getShortString() + ")";
+	}
+
+	/**
+	 * Generate the graph as configured.
+	 *
+	 * @param env Flink execution environment
+	 * @return input graph
+	 */
+	public Graph<K, NullValue, NullValue> create(ExecutionEnvironment env) throws Exception {
+		int lp = littleParallelism.getValue().intValue();
+
+		RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+		long vertexCount = 1L << scale.getValue();
+		long edgeCount = vertexCount * edgeFactor.getValue();
+
+		Graph<LongValue, NullValue, NullValue> rmatGraph = new org.apache.flink.graph.generator.RMatGraph<>(
+				env, rnd, vertexCount, edgeCount)
+			.setConstants(a.getValue().floatValue(), b.getValue().floatValue(), c.getValue().floatValue())
+			.setNoise(noiseEnabled.getValue(), noise.getValue().floatValue())
+			.setParallelism(lp)
+			.generate();
+
+		Graph<K, NullValue, NullValue> graph;
+
+		switch (type.getValue()) {
+			case INTEGER:
+				if (scale.getValue() > 32) {
+					throw new ProgramParametrizationException(
+						"Scale '" + scale.getValue() + "' must be no greater than 32 for type 'integer'");
+				}
+				graph = (Graph<K, NullValue, NullValue>) rmatGraph
+					.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()));
+				break;
+
+			case LONG:
+				if (scale.getValue() > 64) {
+					throw new ProgramParametrizationException(
+						"Scale '" + scale.getValue() + "' must be no greater than 64 for type 'long'");
+				}
+				graph = (Graph<K, NullValue, NullValue>) rmatGraph;
+				break;
+
+			case STRING:
+				// scale bound is same as LONG since keys are generated as LongValue
+				if (scale.getValue() > 64) {
+					throw new ProgramParametrizationException(
+						"Scale '" + scale.getValue() + "' must be no greater than 64 for type 'string'");
+				}
+				graph = (Graph<K, NullValue, NullValue>) rmatGraph
+					.run(new TranslateGraphIds<LongValue, StringValue, NullValue, NullValue>(new LongValueToStringValue()));
+				break;
+
+			default:
+				throw new ProgramParametrizationException("Unknown type '" + type.getValue() + "'");
+		}
+
+		// simplify *after* the translation from LongValue to IntValue or
+		// StringValue to improve the performance of the simplify operators
+		return simplify.simplify(graph);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
new file mode 100644
index 0000000..502826f
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.PathGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.SingletonEdgeGraph}.
+ */
+public class SingletonEdgeGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+	private LongParameter vertexPairCount = new LongParameter(this, "vertex_pair_count")
+		.setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
+
+	@Override
+	public String getName() {
+		return SingletonEdgeGraph.class.getSimpleName();
+	}
+
+	@Override
+	public String getIdentity() {
+		return getName() + " (" + vertexPairCount + ")";
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+		return new org.apache.flink.graph.generator.SingletonEdgeGraph(env, vertexPairCount.getValue())
+			.setParallelism(littleParallelism.getValue().intValue())
+			.generate();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
new file mode 100644
index 0000000..b794f5c
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.StarGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.StarGraph}.
+ */
+public class StarGraph
+extends ParameterizedBase
+implements Input<LongValue, NullValue, NullValue> {
+
+	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
+		.setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
+
+	@Override
+	public String getName() {
+		return StarGraph.class.getSimpleName();
+	}
+
+	@Override
+	public String getIdentity() {
+		return getName() + " (" + vertexCount + ")";
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
+		return new org.apache.flink.graph.generator.StarGraph(env, vertexCount.getValue())
+			.setParallelism(littleParallelism.getValue().intValue())
+			.generate();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
index b24f8cf..7b291be 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameterized.java
@@ -39,7 +39,7 @@ public interface Parameterized {
 	 *
 	 * @return command-line documentation string
 	 */
-	String getParameterization();
+	String getUsage();
 
 	/**
 	 * Read parameter values from the command-line arguments.

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java
index 3b9b80a..5f36ff5 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java
@@ -45,7 +45,7 @@ implements Parameterized {
 	}
 
 	@Override
-	public String getParameterization() {
+	public String getUsage() {
 		StrBuilder strBuilder = new StrBuilder();
 
 		// print parameters as ordered list

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java
new file mode 100644
index 0000000..3e9fd9a
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.parameter;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.Simplify.Ordering;
+import org.apache.flink.types.NullValue;
+
+/**
+ * A simple graph has no self-loops (edges where the source and target vertices
+ * are the same) and no duplicate edges. Flink stores an undirected graph as
+ * a directed graph where each undirected edge is represented by a directed
+ * edge in each direction.
+ *
+ * This {@link Parameter} indicates whether to simplify the graph and if the
+ * graph should be directed or undirected.
+ */
+public class Simplify
+implements Parameter<Ordering> {
+
+	public enum Ordering {
+		// leave the graph unchanged
+		NONE,
+
+		// create a simple, directed graph
+		DIRECTED,
+
+		// create a simple, undirected graph
+		UNDIRECTED,
+
+		// create a simple, undirected graph
+		// remove input edges where source < target before symmetrizing the graph
+		UNDIRECTED_CLIP_AND_FLIP,
+	}
+
+	private Ordering value;
+
+	/**
+	 * Add this parameter to the list of parameters stored by owner.
+	 *
+	 * @param owner the {@link Parameterized} using this {@link Parameter}
+	 */
+	public Simplify(ParameterizedBase owner) {
+		owner.addParameter(this);
+	}
+
+	@Override
+	public String getUsage() {
+		return "[--simplify <directed | undirected [--clip_and_flip]>]";
+	}
+
+	@Override
+	public void configure(ParameterTool parameterTool) {
+		String ordering = parameterTool.get("simplify");
+
+		if (ordering == null) {
+			value = Ordering.NONE;
+		} else {
+			switch (ordering.toLowerCase()) {
+				case "directed":
+					value = Ordering.DIRECTED;
+					break;
+				case "undirected":
+					value = parameterTool.has("clip_and_flip") ? Ordering.UNDIRECTED_CLIP_AND_FLIP : Ordering.UNDIRECTED;
+					break;
+				default:
+					throw new ProgramParametrizationException(
+						"Expected 'directed' or 'undirected' ordering but received '" + ordering + "'");
+			}
+		}
+	}
+
+	@Override
+	public Ordering getValue() {
+		return value;
+	}
+
+	/**
+	 * Simplify the given graph based on the configured value.
+	 *
+	 * @param graph input graph
+	 * @param <T> graph key type
+	 * @return output graph
+	 * @throws Exception on error
+	 */
+	public <T extends Comparable<T>> Graph<T, NullValue, NullValue> simplify(Graph<T, NullValue, NullValue> graph)
+			throws Exception {
+
+		switch (value) {
+			case DIRECTED:
+				graph = graph
+					.run(new org.apache.flink.graph.asm.simple.directed.Simplify<T, NullValue, NullValue>());
+				break;
+			case UNDIRECTED:
+				graph = graph
+					.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<T, NullValue, NullValue>(false));
+				break;
+			case UNDIRECTED_CLIP_AND_FLIP:
+				graph = graph
+					.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<T, NullValue, NullValue>(true));
+				break;
+		}
+
+		return graph;
+	}
+
+	public String getShortString() {
+		switch (value) {
+			case DIRECTED:
+				return "d";
+			case UNDIRECTED:
+				return "u";
+			case UNDIRECTED_CLIP_AND_FLIP:
+				return "\u0254";
+			default:
+				return "";
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ded25be4/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/SimplifyTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/SimplifyTest.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/SimplifyTest.java
new file mode 100644
index 0000000..12ae7dc
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/SimplifyTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.parameter;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.drivers.parameter.Simplify.Ordering;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SimplifyTest
+extends ParameterTestBase {
+
+	private Simplify parameter;
+
+	@Before
+	public void setup() {
+		super.setup();
+
+		parameter = new Simplify(owner);
+	}
+
+	@Test
+	public void testWithDirected() {
+		parameter.configure(ParameterTool.fromArgs(new String[]{"--simplify", "directed"}));
+		Assert.assertEquals(Ordering.DIRECTED, parameter.getValue());
+	}
+
+	@Test
+	public void testWithUndirected() {
+		parameter.configure(ParameterTool.fromArgs(new String[]{"--simplify", "undirected"}));
+		Assert.assertEquals(Ordering.UNDIRECTED, parameter.getValue());
+	}
+
+	@Test
+	public void testWithNoParameter() {
+		parameter.configure(ParameterTool.fromArgs(new String[]{}));
+		Assert.assertEquals(Ordering.NONE, parameter.getValue());
+	}
+}