You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/03/31 20:34:57 UTC

flink git commit: [FLINK-4949] [gelly] Refactor Gelly driver inputs

Repository: flink
Updated Branches:
  refs/heads/master 31e120a98 -> f1ff99fdc


[FLINK-4949] [gelly] Refactor Gelly driver inputs

The Gelly drivers started as simple wrappers around library algorithms
but have grown to handle a matrix of input sources while often running
multiple algorithms and analytics with custom parameterization.

The monolithic drivers are replaced with separate inputs and algorithms.
Command-line parameter parsers are shared and reusable across inputs and
algorithms. Algorithm results now implement a common AlgorithmResult
interface. Drivers are now tested with integration tests.

This closes #3294


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1ff99fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1ff99fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1ff99fd

Branch: refs/heads/master
Commit: f1ff99fdc1e228acd936f5684832d5cf49bdbe04
Parents: 31e120a
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Oct 26 15:18:50 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Mar 31 15:57:54 2017 -0400

----------------------------------------------------------------------
 docs/dev/libs/gelly/index.md                    |  40 ++-
 flink-libraries/flink-gelly-examples/pom.xml    |   9 +-
 .../java/org/apache/flink/graph/Runner.java     | 357 +++++++++++++++++++
 .../main/java/org/apache/flink/graph/Usage.java |  71 ----
 .../org/apache/flink/graph/RunnerITCase.java    | 122 +++++++
 .../flink/graph/drivers/AdamicAdarITCase.java   |  52 +++
 .../drivers/ClusteringCoefficientITCase.java    |  89 +++++
 .../drivers/ConnectedComponentsITCase.java      |  65 ++++
 .../flink/graph/drivers/DriverBaseITCase.java   | 185 ++++++++++
 .../flink/graph/drivers/EdgeListITCase.java     | 240 +++++++++++++
 .../flink/graph/drivers/GraphMetricsITCase.java | 100 ++++++
 .../apache/flink/graph/drivers/HITSITCase.java  |  52 +++
 .../flink/graph/drivers/JaccardIndexITCase.java |  63 ++++
 .../flink/graph/drivers/PageRankITCase.java     |  52 +++
 .../graph/drivers/TriangleListingITCase.java    | 107 ++++++
 15 files changed, 1518 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/docs/dev/libs/gelly/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/index.md b/docs/dev/libs/gelly/index.md
index 8f8c6de..40018e8 100644
--- a/docs/dev/libs/gelly/index.md
+++ b/docs/dev/libs/gelly/index.md
@@ -73,7 +73,7 @@ Running Gelly Examples
 
 The Gelly library and examples jars are provided in the [Flink distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads")
 in the folder **opt** (for versions older than Flink 1.2 these can be manually downloaded from
-[Maven Central](http://search.maven.org/#search|ga|1|flink%20gelly).
+[Maven Central](http://search.maven.org/#search|ga|1|flink%20gelly)).
 
 To run the Gelly examples the **flink-gelly** (for Java) or **flink-gelly-scala** (for Scala) jar must be copied to
 Flink's **lib** directory.
@@ -83,21 +83,29 @@ cp opt/flink-gelly_*.jar lib/
 cp opt/flink-gelly-scala_*.jar lib/
 ~~~
 
-Gelly's examples jar includes both drivers for the library methods as well as additional example algorithms. After
-configuring and starting the cluster, list the available algorithm classes:
+Gelly's examples jar includes drivers for each of the library methods. After configuring and starting the cluster, list
+the available algorithm classes:
 
 ~~~bash
 ./bin/start-cluster.sh
 ./bin/flink run opt/flink-gelly-examples_*.jar
 ~~~
 
-The Gelly drivers can generate [RMat](http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf) graph data or read the
-edge list from a CSV file. Each node in a cluster must have access to the input file. Calculate graph metrics on a
-directed generated graph:
+The Gelly drivers can generate graph data or read the edge list from a CSV file (each node in a cluster must have access
+to the input file). The algorithm description, available inputs and outputs, and configuration are displayed when an
+algorithm is selected. Print usage for [JaccardIndex](./library_methods.html#jaccard-index):
 
 ~~~bash
-./bin/flink run -c org.apache.flink.graph.drivers.GraphMetrics opt/flink-gelly-examples_*.jar \
-    --directed true --input rmat
+./bin/flink run opt/flink-gelly-examples_*.jar --algorithm JaccardIndex
+~~~
+
+Display [graph metrics](./library_methods.html#metric) for a million vertex graph:
+
+~~~bash
+./bin/flink run opt/flink-gelly-examples_*.jar \
+    --algorithm GraphMetrics --order directed \
+    --input RMatGraph --type integer --scale 20 --simplify directed \
+    --output print
 ~~~
 
 The size of the graph is adjusted by the *\-\-scale* and *\-\-edge_factor* parameters. The
@@ -111,15 +119,19 @@ Run a few algorithms and monitor the job progress in Flink's Web UI:
 ~~~bash
 wget -O - http://snap.stanford.edu/data/bigdata/communities/com-lj.ungraph.txt.gz | gunzip -c > com-lj.ungraph.txt
 
-./bin/flink run -q -c org.apache.flink.graph.drivers.GraphMetrics opt/flink-gelly-examples_*.jar \
-    --directed true --input csv --type integer --input_filename com-lj.ungraph.txt --input_field_delimiter '\t'
+./bin/flink run -q opt/flink-gelly-examples_*.jar \
+    --algorithm GraphMetrics --order undirected \
+    --input CSV --type integer --simplify undirected --input_filename com-lj.ungraph.txt --input_field_delimiter $'\t' \
+    --output print
 
-./bin/flink run -q -c org.apache.flink.graph.drivers.ClusteringCoefficient opt/flink-gelly-examples_*.jar \
-    --directed true --input csv --type integer --input_filename com-lj.ungraph.txt  --input_field_delimiter '\t' \
+./bin/flink run -q opt/flink-gelly-examples_*.jar \
+    --algorithm ClusteringCoefficient --order undirected \
+    --input CSV --type integer --simplify undirected --input_filename com-lj.ungraph.txt --input_field_delimiter $'\t' \
     --output hash
 
-./bin/flink run -q -c org.apache.flink.graph.drivers.JaccardIndex opt/flink-gelly-examples_*.jar \
-    --input csv --type integer --simplify true --input_filename com-lj.ungraph.txt --input_field_delimiter '\t' \
+./bin/flink run -q opt/flink-gelly-examples_*.jar \
+    --algorithm JaccardIndex \
+    --input CSV --type integer --simplify undirected --input_filename com-lj.ungraph.txt --input_field_delimiter $'\t' \
     --output hash
 ~~~
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/pom.xml b/flink-libraries/flink-gelly-examples/pom.xml
index b533119..e95aa37 100644
--- a/flink-libraries/flink-gelly-examples/pom.xml
+++ b/flink-libraries/flink-gelly-examples/pom.xml
@@ -83,6 +83,13 @@
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<build>
@@ -163,7 +170,7 @@
 				<configuration>
 					<archive>
 						<manifestEntries>
-							<Main-Class>org.apache.flink.graph.Usage</Main-Class>
+							<Main-Class>org.apache.flink.graph.Runner</Main-Class>
 						</manifestEntries>
 					</archive>
 				</configuration>

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
new file mode 100644
index 0000000..0324814
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.drivers.AdamicAdar;
+import org.apache.flink.graph.drivers.ClusteringCoefficient;
+import org.apache.flink.graph.drivers.ConnectedComponents;
+import org.apache.flink.graph.drivers.Driver;
+import org.apache.flink.graph.drivers.EdgeList;
+import org.apache.flink.graph.drivers.GraphMetrics;
+import org.apache.flink.graph.drivers.HITS;
+import org.apache.flink.graph.drivers.JaccardIndex;
+import org.apache.flink.graph.drivers.PageRank;
+import org.apache.flink.graph.drivers.TriangleListing;
+import org.apache.flink.graph.drivers.input.CompleteGraph;
+import org.apache.flink.graph.drivers.input.CycleGraph;
+import org.apache.flink.graph.drivers.input.EmptyGraph;
+import org.apache.flink.graph.drivers.input.GridGraph;
+import org.apache.flink.graph.drivers.input.HypercubeGraph;
+import org.apache.flink.graph.drivers.input.Input;
+import org.apache.flink.graph.drivers.input.PathGraph;
+import org.apache.flink.graph.drivers.input.RMatGraph;
+import org.apache.flink.graph.drivers.input.SingletonEdgeGraph;
+import org.apache.flink.graph.drivers.input.StarGraph;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.Parameterized;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This default main class executes Flink drivers.
+ *
+ * An execution has one input, one algorithm, and one output. Anything more
+ * complex can be expressed as a user program written in a JVM language.
+ *
+ * Inputs and algorithms are decoupled by, respectively, producing and
+ * consuming a graph. Currently only {@code Graph} is supported but later
+ * updates may add support for new graph types such as {@code BipartiteGraph}.
+ *
+ * Algorithms must explicitly support each type of output via implementation of
+ * interfaces. This is scalable as the number of outputs is small and finite.
+ */
+public class Runner {
+
+	private static final String INPUT = "input";
+
+	private static final String ALGORITHM = "algorithm";
+
+	private static final String OUTPUT = "output";
+
+	private static ParameterizedFactory<Input> inputFactory = new ParameterizedFactory<Input>()
+		.addClass(CompleteGraph.class)
+		.addClass(org.apache.flink.graph.drivers.input.CSV.class)
+		.addClass(CycleGraph.class)
+		.addClass(EmptyGraph.class)
+		.addClass(GridGraph.class)
+		.addClass(HypercubeGraph.class)
+		.addClass(PathGraph.class)
+		.addClass(RMatGraph.class)
+		.addClass(SingletonEdgeGraph.class)
+		.addClass(StarGraph.class);
+
+	private static ParameterizedFactory<Driver> driverFactory = new ParameterizedFactory<Driver>()
+		.addClass(AdamicAdar.class)
+		.addClass(ClusteringCoefficient.class)
+		.addClass(ConnectedComponents.class)
+		.addClass(EdgeList.class)
+		.addClass(GraphMetrics.class)
+		.addClass(HITS.class)
+		.addClass(JaccardIndex.class)
+		.addClass(PageRank.class)
+		.addClass(TriangleListing.class);
+
+	/**
+	 * List available algorithms. This is displayed to the user when no valid
+	 * algorithm is given in the program parameterization.
+	 *
+	 * @return usage string listing available algorithms
+	 */
+	private static String getAlgorithmsListing() {
+		StrBuilder strBuilder = new StrBuilder();
+
+		strBuilder
+			.appendNewLine()
+			.appendln("Select an algorithm to view usage: flink run opt/flink-gelly-examples_<version>.jar --algorithm <algorithm>")
+			.appendNewLine()
+			.appendln("Available algorithms:");
+
+		for (Driver algorithm : driverFactory) {
+			strBuilder.append("  ")
+				.appendFixedWidthPadRight(algorithm.getName(), 30, ' ')
+				.append(algorithm.getShortDescription()).appendNewLine();
+		}
+
+		return strBuilder.toString();
+	}
+
+	/**
+	 * Display the usage for the given algorithm. This includes options for all
+	 * compatible inputs, the selected algorithm, and outputs implemented by
+	 * the selected algorithm.
+	 *
+	 * @param algorithmName unique identifier of the selected algorithm
+	 * @return usage string for the given algorithm
+	 */
+	private static String getAlgorithmUsage(String algorithmName) {
+		StrBuilder strBuilder = new StrBuilder();
+
+		Driver algorithm = driverFactory.get(algorithmName);
+
+		strBuilder
+			.appendNewLine()
+			.appendNewLine()
+			.appendln(algorithm.getLongDescription())
+			.appendNewLine()
+			.append("usage: flink run opt/flink-gelly-examples_<version>.jar --algorithm ")
+			.append(algorithmName)
+			.append(" [algorithm options] --input <input> [input options] --output <output> [output options]")
+			.appendNewLine()
+			.appendNewLine()
+			.appendln("Available inputs:");
+
+		for (Input input : inputFactory) {
+			strBuilder
+				.append("  --input ")
+				.append(input.getName())
+				.append(" ")
+				.appendln(input.getUsage());
+		}
+
+		String algorithmParameterization = algorithm.getUsage();
+
+		if (algorithmParameterization.length() > 0) {
+			strBuilder
+				.appendNewLine()
+				.appendln("Algorithm configuration:")
+				.append("  ")
+				.appendln(algorithm.getUsage());
+		}
+
+		strBuilder
+			.appendNewLine()
+			.appendln("Available outputs:");
+
+		if (algorithm instanceof org.apache.flink.graph.drivers.output.CSV) {
+			strBuilder.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]");
+		}
+
+		if (algorithm instanceof Hash) {
+			strBuilder.appendln("  --output hash");
+		}
+
+		if (algorithm instanceof Print) {
+			strBuilder.appendln("  --output print");
+		}
+
+		return strBuilder
+			.appendNewLine()
+			.toString();
+	}
+
+	public static void main(String[] args) throws Exception {
+		// Set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// should not have any non-Flink data types
+		env.getConfig().disableAutoTypeRegistration();
+		env.getConfig().disableForceAvro();
+		env.getConfig().disableForceKryo();
+
+		ParameterTool parameters = ParameterTool.fromArgs(args);
+		env.getConfig().setGlobalJobParameters(parameters);
+
+		// integration tests run with with object reuse both disabled and enabled
+		if (parameters.has("__disable_object_reuse")) {
+			env.getConfig().disableObjectReuse();
+		} else {
+			env.getConfig().enableObjectReuse();
+		}
+
+		// Usage
+
+		if (!parameters.has(ALGORITHM)) {
+			throw new ProgramParametrizationException(getAlgorithmsListing());
+		}
+
+		String algorithmName = parameters.get(ALGORITHM);
+		Driver algorithm = driverFactory.get(algorithmName);
+
+		if (algorithm == null) {
+			throw new ProgramParametrizationException("Unknown algorithm name: " + algorithmName);
+		}
+
+		if (!parameters.has(INPUT)) {
+			if (!parameters.has(OUTPUT)) {
+				// if neither input nor output is given then print algorithm usage
+				throw new ProgramParametrizationException(getAlgorithmUsage(algorithmName));
+			}
+			throw new ProgramParametrizationException("No input given");
+		}
+
+		String inputName = parameters.get(INPUT);
+		Input input = inputFactory.get(inputName);
+
+		if (input == null) {
+			throw new ProgramParametrizationException("Unknown input type: " + inputName);
+		}
+
+		// Input
+
+		input.configure(parameters);
+		Graph graph = input.create(env);
+
+		// Algorithm
+
+		algorithm.configure(parameters);
+		algorithm.plan(graph);
+
+		// Output
+		if (!parameters.has(OUTPUT)) {
+			throw new ProgramParametrizationException("No output given");
+		}
+
+		String outputName = parameters.get(OUTPUT);
+		String executionNamePrefix = input.getIdentity() + " -> " + algorithmName + " -> ";
+
+		System.out.println();
+
+		switch (outputName.toLowerCase()) {
+			case "csv":
+				if (algorithm instanceof org.apache.flink.graph.drivers.output.CSV) {
+					String filename = parameters.getRequired("output_filename");
+
+					String lineDelimiter = StringEscapeUtils.unescapeJava(
+						parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+					String fieldDelimiter = StringEscapeUtils.unescapeJava(
+						parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+					org.apache.flink.graph.drivers.output.CSV c = (org.apache.flink.graph.drivers.output.CSV) algorithm;
+					c.writeCSV(filename, lineDelimiter, fieldDelimiter);
+
+					env.execute(executionNamePrefix + "CSV");
+				} else {
+					throw new ProgramParametrizationException("Algorithm does not support output type 'CSV'");
+				}
+				break;
+
+			case "hash":
+				if (algorithm instanceof Hash) {
+					Hash h = (Hash) algorithm;
+					h.hash(executionNamePrefix + "Hash");
+				} else {
+					throw new ProgramParametrizationException("Algorithm does not support output type 'hash'");
+				}
+				break;
+
+			case "print":
+				if (algorithm instanceof Print) {
+					Print h = (Print) algorithm;
+					h.print(executionNamePrefix + "Print");
+				} else {
+					throw new ProgramParametrizationException("Algorithm does not support output type 'print'");
+				}
+				break;
+
+			default:
+				throw new ProgramParametrizationException("Unknown output type: " + outputName);
+		}
+	}
+
+	/**
+	 * Stores a list of classes for which an instance can be requested by name
+	 * and implements an iterator over class instances.
+	 *
+	 * @param <T> base type for stored classes
+	 */
+	private static class ParameterizedFactory<T extends Parameterized>
+	implements Iterable<T> {
+		private List<Class<? extends T>> classes = new ArrayList<>();
+
+		/**
+		 * Add a class to the factory.
+		 *
+		 * @param cls subclass of T
+		 * @return this
+		 */
+		public ParameterizedFactory<T> addClass(Class<? extends T> cls) {
+			this.classes.add(cls);
+			return this;
+		}
+
+		/**
+		 * Obtain a class instance by name.
+		 *
+		 * @param name String matching getName()
+		 * @return class instance or null if no matching class
+		 */
+		public T get(String name) {
+			for (T instance : this) {
+				if (name.equals(instance.getName())) {
+					return instance;
+				}
+			}
+
+			return null;
+		}
+
+		@Override
+		public Iterator<T> iterator() {
+			return new Iterator<T>() {
+				private int index;
+
+				@Override
+				public boolean hasNext() {
+					return index < classes.size();
+				}
+
+				@Override
+				public T next() {
+					return InstantiationUtil.instantiate(classes.get(index++));
+				}
+
+				@Override
+				public void remove() {
+					throw new UnsupportedOperationException();
+				}
+			};
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
deleted file mode 100644
index 642fe5b..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph;
-
-import org.apache.commons.lang3.text.StrBuilder;
-import org.apache.flink.client.program.ProgramParametrizationException;
-
-/**
- * This default main class prints usage listing available classes.
- */
-public class Usage {
-
-	private static final Class[] DRIVERS = new Class[]{
-		org.apache.flink.graph.drivers.ClusteringCoefficient.class,
-		org.apache.flink.graph.drivers.GraphMetrics.class,
-		org.apache.flink.graph.drivers.HITS.class,
-		org.apache.flink.graph.drivers.JaccardIndex.class,
-		org.apache.flink.graph.drivers.TriangleListing.class,
-	};
-
-	private static final Class[] EXAMPLES = new Class[]{
-		org.apache.flink.graph.examples.EuclideanGraphWeighing.class,
-		org.apache.flink.graph.examples.GSASingleSourceShortestPaths.class,
-		org.apache.flink.graph.examples.IncrementalSSSP.class,
-		org.apache.flink.graph.examples.MusicProfiles.class,
-		org.apache.flink.graph.examples.PregelSSSP.class,
-		org.apache.flink.graph.examples.SingleSourceShortestPaths.class,
-		org.apache.flink.graph.scala.examples.ConnectedComponents.class,
-		org.apache.flink.graph.scala.examples.GSASingleSourceShortestPaths.class,
-		org.apache.flink.graph.scala.examples.SingleSourceShortestPaths.class,
-	};
-
-	private static String getUsage() {
-		StrBuilder strBuilder = new StrBuilder();
-
-		strBuilder.appendNewLine();
-		strBuilder.appendln("Driver classes call algorithms from the Gelly library:");
-		for (Class cls : DRIVERS) {
-			strBuilder.append("  ").appendln(cls.getName());
-		}
-
-		strBuilder.appendNewLine();
-		strBuilder.appendln("Example classes illustrate Gelly APIs or alternative algorithms:");
-		for (Class cls : EXAMPLES) {
-			strBuilder.append("  ").appendln(cls.getName());
-		}
-
-		return strBuilder.toString();
-	}
-
-	public static void main(String[] args) throws Exception {
-		// this exception is throw to prevent Flink from printing an error message
-		throw new ProgramParametrizationException(getUsage());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java
new file mode 100644
index 0000000..a48fdf1
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/RunnerITCase.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.drivers.DriverBaseITCase;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class RunnerITCase
+extends DriverBaseITCase {
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	public RunnerITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testWithoutAlgorithm() throws Exception {
+		String expected = "Select an algorithm to view usage:";
+
+		thrown.expect(ProgramParametrizationException.class);
+		thrown.expectMessage(expected);
+
+		expectedOutput(new String[]{}, expected);
+	}
+
+	@Test
+	public void testWithUnknownAlgorithm() throws Exception {
+		String expected = "Unknown algorithm name: NotAnAlgorithm";
+
+		thrown.expect(ProgramParametrizationException.class);
+		thrown.expectMessage(expected);
+
+		expectedOutput(new String[]{"--algorithm", "NotAnAlgorithm"}, expected);
+	}
+
+	@Test
+	public void testAlgorithmUsage() throws Exception {
+		String expected = "Pass-through of the graph's edge list.";
+
+		thrown.expect(ProgramParametrizationException.class);
+		thrown.expectMessage(expected);
+
+		expectedOutput(new String[]{"--algorithm", "EdgeList"}, expected);
+	}
+
+	@Test
+	public void testWithoutInput() throws Exception {
+		String expected = "No input given";
+
+		thrown.expect(ProgramParametrizationException.class);
+		thrown.expectMessage(expected);
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--output", "NotAnOutput"},
+			expected);
+	}
+
+	@Test
+	public void testWithUnknownInput() throws Exception {
+		String expected = "Unknown input type: NotAnInput";
+
+		thrown.expect(ProgramParametrizationException.class);
+		thrown.expectMessage(expected);
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "NotAnInput"},
+			expected);
+	}
+
+	@Test
+	public void testWithoutOutput() throws Exception {
+		String expected = "No output given";
+
+		thrown.expect(ProgramParametrizationException.class);
+		thrown.expectMessage(expected);
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "RMatGraph"},
+			expected);
+	}
+
+	@Test
+	public void testWithUnknownOutput() throws Exception {
+		String expected = "Unknown output type: NotAnOutput";
+
+		thrown.expect(ProgramParametrizationException.class);
+		thrown.expectMessage(expected);
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "RMatGraph",
+				"--output", "NotAnOutput"},
+			expected);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java
new file mode 100644
index 0000000..400c241
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/AdamicAdarITCase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class AdamicAdarITCase
+extends DriverBaseITCase {
+
+	public AdamicAdarITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testLongDescription() throws Exception {
+		String expected = regexSubstring(new AdamicAdar().getLongDescription());
+
+		expectedOutputFromException(
+			new String[]{"--algorithm", "AdamicAdar"},
+			expected,
+			ProgramParametrizationException.class);
+	}
+
+	@Test
+	public void testPrintWithRMatIntegerGraph() throws Exception {
+		expectedCount(
+			new String[]{"--algorithm", "AdamicAdar",
+				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
+				"--output", "print"},
+			221628);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java
new file mode 100644
index 0000000..f215b91
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ClusteringCoefficientITCase.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ClusteringCoefficientITCase
+extends DriverBaseITCase {
+
+	public ClusteringCoefficientITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testLongDescription() throws Exception {
+		String expected = regexSubstring(new ClusteringCoefficient().getLongDescription());
+
+		expectedOutputFromException(
+			new String[]{"--algorithm", "ClusteringCoefficient"},
+			expected,
+			ProgramParametrizationException.class);
+	}
+
+	@Test
+	public void testDirectedHashWithRMatIntegerGraph() throws Exception {
+		String expected = "\n" +
+			"ChecksumHashCode 0x000001c0409df6c0, count 902\n" +
+			"triplet count: 1003442, triangle count: 225147, global clustering coefficient: 0.22437470[0-9]+\n" +
+			"vertex count: 902, average clustering coefficient: 0.32943748[0-9]+\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "ClusteringCoefficient", "--order", "directed",
+				"--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testDirectedPrintWithRMatIntegerGraph() throws Exception {
+		expectedCount(
+			new String[]{"--algorithm", "ClusteringCoefficient", "--order", "directed",
+				"--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
+				"--output", "print"},
+			904);
+	}
+
+	@Test
+	public void testUndirectedHashWithRMatIntegerGraph() throws Exception {
+		String expected = "\n" +
+			"ChecksumHashCode 0x000001ccf8c45fdb, count 902\n" +
+			"triplet count: 1003442, triangle count: 225147, global clustering coefficient: 0.22437470[0-9]+\n" +
+			"vertex count: 902, average clustering coefficient: 0.42173070[0-9]+\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "ClusteringCoefficient", "--order", "undirected",
+				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testUndirectedPrintWithRMatIntegerGraph() throws Exception {
+		expectedCount(
+			new String[]{"--algorithm", "ClusteringCoefficient", "--order", "undirected",
+				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
+				"--output", "print"},
+			904);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java
new file mode 100644
index 0000000..b91abb3
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/ConnectedComponentsITCase.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ConnectedComponentsITCase
+extends DriverBaseITCase {
+
+	public ConnectedComponentsITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testLongDescription() throws Exception {
+		String expected = regexSubstring(new ConnectedComponents().getLongDescription());
+
+		expectedOutputFromException(
+			new String[]{"--algorithm", "ConnectedComponents"},
+			expected,
+			ProgramParametrizationException.class);
+	}
+
+	@Test
+	public void testHashWithRMatIntegerGraph() throws Exception {
+		String expected = "\\nChecksumHashCode 0x0000000000cdc7e7, count 838\\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "ConnectedComponents",
+				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected", "--edge_factor", "1",
+					"--a", "0.25", "--b", "0.25", "--c", "0.25", "--noise_enabled", "--noise", "1.0",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testPrintWithRMatIntegerGraph() throws Exception {
+		expectedCount(
+			new String[]{"--algorithm", "ConnectedComponents",
+				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected", "--edge_factor", "1",
+					"--a", "0.25", "--b", "0.25", "--c", "0.25", "--noise_enabled", "--noise", "1.0",
+				"--output", "print"},
+			838);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java
new file mode 100644
index 0000000..d19ca97
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/DriverBaseITCase.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.flink.graph.Runner;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+import org.junit.runners.Parameterized;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.regex.Pattern;
+
+public abstract class DriverBaseITCase
+extends MultipleProgramsTestBase {
+
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
+	protected DriverBaseITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	// extend MultipleProgramsTestBase default to include object reuse mode
+	@Parameterized.Parameters(name = "Execution mode = {0}")
+	public static Collection<Object[]> executionModes() {
+		return Arrays.asList(
+			new Object[] { TestExecutionMode.CLUSTER },
+			new Object[] { TestExecutionMode.CLUSTER_OBJECT_REUSE },
+			new Object[] { TestExecutionMode.COLLECTION });
+	}
+
+	/**
+	 * Simpler variant of {@link #expectedOutput(String[], String)}
+	 * that only compares the count of the number of records in standard output.
+	 * This is intended for use for algorithms where the result cannot be
+	 * hashed due to approximate results (typically floating point arithmetic).
+	 *
+	 * @param parameters algorithm, input, and output arguments
+	 * @param records expected number of records in standard output
+	 * @throws Exception on error
+	 */
+	protected void expectedCount(String[] parameters, int records) throws Exception {
+		String output = getSystemOutput(parameters);
+
+		// subtract the extra newline
+		int numberOfRecords = output.split(System.getProperty("line.separator")).length - 1;
+		Assert.assertEquals(records, numberOfRecords);
+	}
+
+	/**
+	 * Executes the driver with the provided arguments and compares the
+	 * standard output with the given regular expression.
+	 *
+	 * @param parameters algorithm, input, and output arguments
+	 * @param expected expected standard output
+	 * @throws Exception on error
+	 */
+	protected void expectedOutput(String[] parameters, String expected) throws Exception {
+		String output = getSystemOutput(parameters);
+
+		Assert.assertThat(output, RegexMatcher.matchesRegex(expected));
+	}
+
+	/**
+	 * Executes the driver with the provided arguments and compares the
+	 * exception and exception method with the given class and regular
+	 * expression.
+	 *
+	 * @param parameters algorithm, input, and output arguments
+	 * @param expected expected standard output
+	 * @param exception expected exception
+	 * @throws Exception on error when not matching exception
+	 */
+	protected void expectedOutputFromException(String[] parameters, String expected,Class<? extends Throwable> exception) throws Exception {
+		expectedException.expect(exception);
+		expectedException.expectMessage(RegexMatcher.matchesRegex(expected));
+
+		getSystemOutput(parameters);
+	}
+
+	/**
+	 * Generate a regular expression string by quoting the input string and
+	 * adding wildcard matchers to the beginning and end.
+	 *
+	 * @param input source string
+	 * @return regex string
+	 */
+	protected String regexSubstring(String input) {
+		// Pattern.quote disables regex interpretation of the input string and
+		// flag expression "(?s)" (Pattern.DOTALL) matches "." against any
+		// character including line terminators
+		return "(?s).*" + Pattern.quote(input) + ".*";
+	}
+
+	/**
+	 * Capture the command-line standard output from the driver execution.
+	 *
+	 * @param args driver command-line arguments
+	 * @return standard output from driver execution
+	 * @throws Exception on error
+	 */
+	private String getSystemOutput(String[] args) throws Exception {
+		ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+		// Configure object reuse mode
+		switch (mode) {
+			case CLUSTER:
+			case COLLECTION:
+				args = (String[])ArrayUtils.add(args, "--__disable_object_reuse");
+				break;
+
+			case CLUSTER_OBJECT_REUSE:
+				// object reuse is enabled by default when executing drivers
+				break;
+
+			default:
+				throw new FlinkRuntimeException("Unknown execution mode " + mode);
+		}
+
+		// Redirect stdout
+		PrintStream stdout = System.out;
+		System.setOut(new PrintStream(output));
+
+		Runner.main(args);
+
+		// Restore stdout
+		System.setOut(stdout);
+
+		return output.toString();
+	}
+
+	/**
+	 * Implements a Hamcrest regex matcher. Hamcrest 2.0 provides
+	 * Matchers.matchesPattern(String) but Flink depends on Hamcrest 1.3.
+	 *
+	 * see http://stackoverflow.com/a/25021229
+	 */
+	private static class RegexMatcher
+	extends TypeSafeMatcher<String> {
+		private final String regex;
+
+		private RegexMatcher(final String regex) {
+			this.regex = regex;
+		}
+
+		@Override
+		public void describeTo(final Description description) {
+			description.appendText("matches regex=`" + regex + "`");
+		}
+
+		@Override
+		public boolean matchesSafely(final String string) {
+			return string.matches(regex);
+		}
+
+		public static RegexMatcher matchesRegex(final String regex) {
+			return new RegexMatcher(regex);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
new file mode 100644
index 0000000..d9cac8b
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class EdgeListITCase
+extends DriverBaseITCase {
+
+	public EdgeListITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testLongDescription() throws Exception {
+		String expected = regexSubstring(new EdgeList().getLongDescription());
+
+		expectedOutputFromException(
+			new String[]{"--algorithm", "EdgeList"},
+			expected,
+			ProgramParametrizationException.class);
+	}
+
+	@Test
+	public void testHashWithCompleteGraph() throws Exception {
+		String expected = "\nChecksumHashCode 0x0000000006788c22, count 1722\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "CompleteGraph", "--vertex_count", "42",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testHashWithCycleGraph() throws Exception {
+		String expected = "\nChecksumHashCode 0x000000000050cea4, count 84\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "CycleGraph", "--vertex_count", "42",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testHashWithEmptyGraph() throws Exception {
+		String expected = "\nChecksumHashCode 0x0000000000000000, count 0\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "EmptyGraph", "--vertex_count", "42",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testHashWithGridGraph() throws Exception {
+		String expected = "\nChecksumHashCode 0x00000000357d33a6, count 2990\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "GridGraph", "--dim0", "5:true", "--dim1", "8:false", "--dim2", "13:true",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testHashWithHypercubeGraph() throws Exception {
+		String expected = "\nChecksumHashCode 0x0000000014a72800, count 2048\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "HypercubeGraph", "--dimensions", "8",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testHashWithPathGraph() throws Exception {
+		String expected = "\nChecksumHashCode 0x00000000004ee21a, count 82\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "PathGraph", "--vertex_count", "42",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testHashWithRMatIntegerGraph() throws Exception {
+		String expected = "\nChecksumHashCode 0x00000000ed469103, count 16384\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "RMatGraph", "--type", "integer",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testHashWithRMatIntegerDirectedGraph() throws Exception {
+		String expected = "\nChecksumHashCode 0x00000000c53bfc9b, count 12009\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testHashWithRMatIntegerUndirectedGraph() throws Exception {
+		String expected = "\nChecksumHashCode 0x00000001664eb9e4, count 20884\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testHashWithRMatLongGraph() throws Exception {
+		String expected = "\nChecksumHashCode 0x0000000116ee9103, count 16384\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "RMatGraph", "--type", "long",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testPrintWithRMatLongGraph() throws Exception {
+
+		expectedCount(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "RMatGraph", "--type", "long",
+				"--output", "print"},
+			16384);
+	}
+
+	@Test
+	public void testHashWithRMatLongDirectedGraph() throws Exception {
+		String expected = "\nChecksumHashCode 0x00000000e3c4643b, count 12009\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "RMatGraph", "--type", "long", "--simplify", "directed",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testHashWithRMatLongUndirectedGraph() throws Exception {
+		String expected = "\nChecksumHashCode 0x000000019b67ae64, count 20884\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "RMatGraph", "--type", "long", "--simplify", "undirected",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testHashWithRMatStringGraph() throws Exception {
+		String expected = "\nChecksumHashCode 0x00000071dc80a623, count 16384\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "RMatGraph", "--type", "string",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testHashWithRMatStringDirectedGraph() throws Exception {
+		String expected = "\nChecksumHashCode 0x0000005d58b3fa7d, count 12009\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "RMatGraph", "--type", "string", "--simplify", "directed",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testHashWithRMatStringUndirectedGraph() throws Exception {
+		String expected = "\nChecksumHashCode 0x000000aa54987304, count 20884\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "RMatGraph", "--type", "string", "--simplify", "undirected",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testHashWithSingletonEdgeGraph() throws Exception {
+		String expected = "\nChecksumHashCode 0x0000000001af8ee8, count 200\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "SingletonEdgeGraph", "--vertex_pair_count", "100",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testHashWithStarGraph() throws Exception {
+		String expected = "\nChecksumHashCode 0x000000000042789a, count 82\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "EdgeList",
+				"--input", "StarGraph", "--vertex_count", "42",
+				"--output", "hash"},
+			expected);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
new file mode 100644
index 0000000..a5ea486
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphMetricsITCase
+extends DriverBaseITCase {
+
+	public GraphMetricsITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testLongDescription() throws Exception {
+		String expected = regexSubstring(new GraphMetrics().getLongDescription());
+
+		expectedOutputFromException(
+			new String[]{"--algorithm", "GraphMetrics"},
+			expected,
+			ProgramParametrizationException.class);
+	}
+
+	@Test
+	public void testWithDirectedRMatIntegerGraph() throws Exception {
+		String expected = "\n" +
+			"Vertex metrics:\n" +
+			"  vertex count: 902\n" +
+			"  edge count: 12,009\n" +
+			"  unidirectional edge count: 8,875\n" +
+			"  bidirectional edge count: 1,567\n" +
+			"  average degree: 13.314\n" +
+			"  density: 0.01477663\n" +
+			"  triplet count: 1,003,442\n" +
+			"  maximum degree: 463\n" +
+			"  maximum out degree: 334\n" +
+			"  maximum in degree: 342\n" +
+			"  maximum triplets: 106,953\n" +
+			"\n" +
+			"Edge metrics:\n" +
+			"  triangle triplet count: 107,817\n" +
+			"  rectangle triplet count: 315,537\n" +
+			"  maximum triangle triplets: 820\n" +
+			"  maximum rectangle triplets: 3,822\n";
+
+		String[] arguments = new String[]{"--algorithm", "GraphMetrics", "--order", "directed",
+			"--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
+			"--output"};
+
+		expectedOutput(ArrayUtils.addAll(arguments, "hash"), expected);
+		expectedOutput(ArrayUtils.addAll(arguments, "print"), expected);
+	}
+
+	@Test
+	public void testWithUndirectedRMatIntegerGraph() throws Exception {
+		String expected = "\n" +
+			"Vertex metrics:\n" +
+			"  vertex count: 902\n" +
+			"  edge count: 10,442\n" +
+			"  average degree: 23.153\n" +
+			"  density: 0.025697\n" +
+			"  triplet count: 1,003,442\n" +
+			"  maximum degree: 463\n" +
+			"  maximum triplets: 106,953\n" +
+			"\n" +
+			"Edge metrics:\n" +
+			"  triangle triplet count: 107,817\n" +
+			"  rectangle triplet count: 315,537\n" +
+			"  maximum triangle triplets: 820\n" +
+			"  maximum rectangle triplets: 3,822\n";
+
+		String[] arguments = new String[]{"--algorithm", "GraphMetrics", "--order", "undirected",
+				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
+				"--output"};
+
+		expectedOutput(ArrayUtils.addAll(arguments, "hash"), expected);
+		expectedOutput(ArrayUtils.addAll(arguments, "print"), expected);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
new file mode 100644
index 0000000..5474d1b
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class HITSITCase
+extends DriverBaseITCase {
+
+	public HITSITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testLongDescription() throws Exception {
+		String expected = regexSubstring(new HITS().getLongDescription());
+
+		expectedOutputFromException(
+			new String[]{"--algorithm", "HITS"},
+			expected,
+			ProgramParametrizationException.class);
+	}
+
+	@Test
+	public void testPrintWithRMatIntegerGraph() throws Exception {
+		expectedCount(
+			new String[]{"--algorithm", "HITS",
+				"--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
+				"--output", "print"},
+			902);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
new file mode 100644
index 0000000..0632856
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class JaccardIndexITCase
+extends DriverBaseITCase {
+
+	public JaccardIndexITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testLongDescription() throws Exception {
+		String expected = regexSubstring(new JaccardIndex().getLongDescription());
+
+		expectedOutputFromException(
+			new String[]{"--algorithm", "JaccardIndex"},
+			expected,
+			ProgramParametrizationException.class);
+	}
+
+	@Test
+	public void testHashWithRMatIntegerGraph() throws Exception {
+		String expected = "\nChecksumHashCode 0x0001b188570b2572, count 221628\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "JaccardIndex",
+				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testPrintWithRMatIntegerGraph() throws Exception {
+		expectedCount(
+			new String[]{"--algorithm", "JaccardIndex",
+				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
+				"--output", "print"},
+			221628);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
new file mode 100644
index 0000000..d7301d0
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class PageRankITCase
+extends DriverBaseITCase {
+
+	public PageRankITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testLongDescription() throws Exception {
+		String expected = regexSubstring(new PageRank().getLongDescription());
+
+		expectedOutputFromException(
+			new String[]{"--algorithm", "PageRank"},
+			expected,
+			ProgramParametrizationException.class);
+	}
+
+	@Test
+	public void testPrintWithRMatIntegerGraph() throws Exception {
+		expectedCount(
+			new String[]{"--algorithm", "PageRank",
+				"--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
+				"--output", "print"},
+			902);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ff99fd/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
new file mode 100644
index 0000000..0d2897c
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TriangleListingITCase
+extends DriverBaseITCase {
+
+	public TriangleListingITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testLongDescription() throws Exception {
+		String expected = regexSubstring(new TriangleListing().getLongDescription());
+
+		expectedOutputFromException(
+			new String[]{"--algorithm", "TriangleListing"},
+			expected,
+			ProgramParametrizationException.class);
+	}
+
+	@Test
+	public void testDirectedHashWithRMatIntegerGraph() throws Exception {
+		String expected = "\n" +
+			"ChecksumHashCode 0x0000001beffe6edd, count 75049\n" +
+			"Triadic census:\n" +
+			"  003: 113,435,893\n" +
+			"  012: 6,632,528\n" +
+			"  102: 983,535\n" +
+			"  021d: 118,574\n" +
+			"  021u: 118,566\n" +
+			"  021c: 237,767\n" +
+			"  111d: 129,773\n" +
+			"  111u: 130,041\n" +
+			"  030t: 16,981\n" +
+			"  030c: 5,535\n" +
+			"  201: 43,574\n" +
+			"  120d: 7,449\n" +
+			"  120u: 7,587\n" +
+			"  120c: 15,178\n" +
+			"  210: 17,368\n" +
+			"  300: 4,951\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "TriangleListing", "--order", "directed", "--sort_triangle_vertices", "--triadic_census",
+				"--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testDirectedPrintWithRMatIntegerGraph() throws Exception {
+		expectedCount(
+			new String[]{"--algorithm", "TriangleListing", "--order", "directed",
+				"--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
+				"--output", "print"},
+			75049);
+	}
+
+	@Test
+	public void testUndirectedHashWithRMatIntegerGraph() throws Exception {
+		String expected = "\n" +
+			"ChecksumHashCode 0x00000000e6b3f32c, count 75049\n" +
+			"Triadic census:\n" +
+			"  03: 113,435,893\n" +
+			"  12: 7,616,063\n" +
+			"  21: 778,295\n" +
+			"  30: 75,049\n";
+
+		expectedOutput(
+			new String[]{"--algorithm", "TriangleListing", "--order", "undirected", "--sort_triangle_vertices", "--triadic_census",
+				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
+				"--output", "hash"},
+			expected);
+	}
+
+	@Test
+	public void testUndirectedPrintWithRMatIntegerGraph() throws Exception {
+		expectedCount(
+			new String[]{"--algorithm", "TriangleListing", "--order", "undirected",
+				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
+				"--output", "print"},
+			75049);
+	}
+}