You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/10/24 16:32:46 UTC

[1/2] flink git commit: [FLINK-4204] [gelly] Clean up gelly-examples

Repository: flink
Updated Branches:
  refs/heads/master c4783c856 -> c4f9f0d78


http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
deleted file mode 100644
index 615d765..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAnalytic;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
-/**
- * Driver for the library implementations of Global and Local Clustering Coefficient.
- *
- * This example reads a simple directed or undirected graph from a CSV file or
- * generates an RMat graph with the given scale and edge factor then calculates
- * the local clustering coefficient for each vertex and the global clustering
- * coefficient for the graph.
- *
- * @see org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient
- * @see org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient
- * @see org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient
- * @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient
- */
-public class ClusteringCoefficient {
-
-	public static final int DEFAULT_SCALE = 10;
-
-	public static final int DEFAULT_EDGE_FACTOR = 16;
-
-	public static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-	private static void printUsage() {
-		System.out.println(WordUtils.wrap("The local clustering coefficient measures the connectedness of each" +
-			" vertex's neighborhood and the global clustering coefficient measures the connectedness of the graph." +
-			" Scores range from 0.0 (no edges between neighbors or vertices) to 1.0 (neighborhood or graph" +
-			" is a clique).", 80));
-		System.out.println();
-		System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" +
-			" the vertex, and the number of edges between vertex neighbors.", 80));
-		System.out.println();
-		System.out.println("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>");
-		System.out.println();
-		System.out.println("options:");
-		System.out.println("  --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
-		System.out.println("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
-		System.out.println();
-		System.out.println("  --output print");
-		System.out.println("  --output hash");
-		System.out.println("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]");
-	}
-
-	public static void main(String[] args) throws Exception {
-		// Set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
-
-		ParameterTool parameters = ParameterTool.fromArgs(args);
-
-		if (! parameters.has("directed")) {
-			printUsage();
-			return;
-		}
-		boolean directedAlgorithm = parameters.getBoolean("directed");
-
-		int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
-
-		// global and local clustering coefficient results
-		GraphAnalytic gcc;
-		GraphAnalytic acc;
-		DataSet lcc;
-
-		switch (parameters.get("input", "")) {
-			case "csv": {
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
-						.ignoreCommentsEdges("#")
-						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter);
-
-				switch (parameters.get("type", "")) {
-					case "integer": {
-						Graph<LongValue, NullValue, NullValue> graph = reader
-							.keyType(LongValue.class);
-
-						if (directedAlgorithm) {
-							gcc = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							acc = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							lcc = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						} else {
-							gcc = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							acc = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							lcc = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						}
-					} break;
-
-					case "string": {
-						Graph<StringValue, NullValue, NullValue> graph = reader
-							.keyType(StringValue.class);
-
-						if (directedAlgorithm) {
-							gcc = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							acc = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							lcc = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						} else {
-							gcc = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							acc = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							lcc = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						}
-					} break;
-
-					default:
-						printUsage();
-						return;
-				}
-			} break;
-
-			case "rmat": {
-				int scale = parameters.getInt("scale", DEFAULT_SCALE);
-				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
-				long vertexCount = 1L << scale;
-				long edgeCount = vertexCount * edgeFactor;
-
-				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-					.setParallelism(little_parallelism)
-					.generate();
-
-				if (directedAlgorithm) {
-					if (scale > 32) {
-						Graph<LongValue, NullValue, NullValue> newGraph = graph
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
-								.setParallelism(little_parallelism));
-
-						gcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						acc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						lcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
-								.setIncludeZeroDegreeVertices(false)
-								.setLittleParallelism(little_parallelism));
-					} else {
-						Graph<IntValue, NullValue, NullValue> newGraph = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
-								.setParallelism(little_parallelism))
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>()
-								.setParallelism(little_parallelism));
-
-						gcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						acc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						lcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
-								.setIncludeZeroDegreeVertices(false)
-								.setLittleParallelism(little_parallelism));
-					}
-				} else {
-					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
-					if (scale > 32) {
-						Graph<LongValue, NullValue, NullValue> newGraph = graph
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
-								.setParallelism(little_parallelism));
-
-						gcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						acc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						lcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
-								.setIncludeZeroDegreeVertices(false)
-								.setLittleParallelism(little_parallelism));
-					} else {
-						Graph<IntValue, NullValue, NullValue> newGraph = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
-								.setParallelism(little_parallelism))
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
-								.setParallelism(little_parallelism));
-
-						gcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						acc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						lcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
-								.setIncludeZeroDegreeVertices(false)
-								.setLittleParallelism(little_parallelism));
-					}
-				}
-			} break;
-
-			default:
-				printUsage();
-				return;
-		}
-
-		switch (parameters.get("output", "")) {
-			case "print":
-				if (directedAlgorithm) {
-					for (Object e: lcc.collect()) {
-						org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result result =
-							(org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result)e;
-						System.out.println(result.toVerboseString());
-					}
-				} else {
-					for (Object e: lcc.collect()) {
-						org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result result =
-							(org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result)e;
-						System.out.println(result.toVerboseString());
-					}
-				}
-				System.out.println(gcc.getResult());
-				System.out.println(acc.getResult());
-				break;
-
-			case "hash":
-				System.out.println(DataSetUtils.checksumHashCode(lcc));
-				System.out.println(gcc.getResult());
-				System.out.println(acc.getResult());
-				break;
-
-			case "csv":
-				String filename = parameters.get("output_filename");
-
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				lcc.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
-
-				env.execute("Clustering Coefficient");
-
-				System.out.println(gcc.getResult());
-				System.out.println(acc.getResult());
-				break;
-
-			default:
-				printUsage();
-				return;
-		}
-
-		JobExecutionResult result = env.getLastJobExecutionResult();
-
-		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
deleted file mode 100644
index 73bba2c..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-
-import java.text.NumberFormat;
-
-/**
- * Generate an RMat graph for Graph 500.
- *
- * Note that this does not yet implement permutation of vertex labels or edges.
- *
- * @see <a href="http://www.graph500.org/specifications">Graph 500</a>
- */
-public class Graph500 {
-
-	public static final int DEFAULT_SCALE = 10;
-
-	public static final int DEFAULT_EDGE_FACTOR = 16;
-
-	public static final boolean DEFAULT_SIMPLIFY = false;
-
-	public static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-	public static void main(String[] args) throws Exception {
-		// Set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
-
-		ParameterTool parameters = ParameterTool.fromArgs(args);
-
-		// Generate RMat graph
-		int scale = parameters.getInt("scale", DEFAULT_SCALE);
-		int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-		RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
-		long vertexCount = 1L << scale;
-		long edgeCount = vertexCount * edgeFactor;
-
-		boolean simplify = parameters.getBoolean("simplify", DEFAULT_SIMPLIFY);
-		boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
-		Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-			.generate();
-
-		if (simplify) {
-			graph = graph.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
-		}
-
-		DataSet<Tuple2<LongValue,LongValue>> edges = graph
-			.getEdges()
-			.project(0, 1);
-
-		// Print, hash, or write RMat graph to disk
-		switch (parameters.get("output", "")) {
-		case "print":
-			edges.print();
-			break;
-
-		case "hash":
-			System.out.println(DataSetUtils.checksumHashCode(edges));
-			break;
-
-		case "csv":
-			String filename = parameters.get("filename");
-
-			String row_delimiter = parameters.get("row_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER);
-			String field_delimiter = parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
-
-			edges.writeAsCsv(filename, row_delimiter, field_delimiter);
-
-			env.execute();
-			break;
-		default:
-			System.out.println("A Graph500 generator using the Recursive Matrix (RMat) graph generator.");
-			System.out.println();
-			System.out.println("The graph matrix contains 2^scale vertices although not every vertex will");
-			System.out.println("be represented in an edge. The number of edges is edge_factor * 2^scale edges");
-			System.out.println("although some edges may be duplicates.");
-			System.out.println();
-			System.out.println("Note: this does not yet implement permutation of vertex labels or edges.");
-			System.out.println();
-			System.out.println("usage:");
-			System.out.println("  Graph500 [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print");
-			System.out.println("  Graph500 [--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash");
-			System.out.println("  Graph500 [--scale SCALE] [--edge_factor EDGE_FACTOR] --output csv" +
-					" --filename FILENAME [--row_delimiter ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]");
-
-			return;
-		}
-
-		JobExecutionResult result = env.getLastJobExecutionResult();
-
-		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
deleted file mode 100644
index e7b47bf..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.examples.utils.ExampleUtils;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-
-/**
- * This example illustrates how to use Gelly metrics methods and get simple statistics
- * from the input graph.  
- * 
- * The program creates a random graph and computes and prints
- * the following metrics:
- * - number of vertices
- * - number of edges
- * - average node degree
- * - the vertex ids with the max/min in- and out-degrees
- *
- * The input file is expected to contain one edge per line,
- * with long IDs and no values, in the following format:
- * "&lt;sourceVertexID&gt;\t&lt;targetVertexID&gt;".
- * If no arguments are provided, the example runs with a random graph of 100 vertices.
- *
- */
-public class GraphMetrics implements ProgramDescription {
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		/** create the graph **/
-		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(getEdgesDataSet(env), env);
-		
-		/** get the number of vertices **/
-		long numVertices = graph.numberOfVertices();
-		
-		/** get the number of edges **/
-		long numEdges = graph.numberOfEdges();
-		
-		/** compute the average node degree **/
-		DataSet<Tuple2<Long, LongValue>> verticesWithDegrees = graph.getDegrees();
-
-		DataSet<Double> avgNodeDegree = verticesWithDegrees
-				.aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper(numVertices));
-		
-		/** find the vertex with the maximum in-degree **/
-		DataSet<Long> maxInDegreeVertex = graph.inDegrees().maxBy(1).map(new ProjectVertexId());
-
-		/** find the vertex with the minimum in-degree **/
-		DataSet<Long> minInDegreeVertex = graph.inDegrees().minBy(1).map(new ProjectVertexId());
-
-		/** find the vertex with the maximum out-degree **/
-		DataSet<Long> maxOutDegreeVertex = graph.outDegrees().maxBy(1).map(new ProjectVertexId());
-
-		/** find the vertex with the minimum out-degree **/
-		DataSet<Long> minOutDegreeVertex = graph.outDegrees().minBy(1).map(new ProjectVertexId());
-		
-		/** print the results **/
-		ExampleUtils.printResult(env.fromElements(numVertices), "Total number of vertices");
-		ExampleUtils.printResult(env.fromElements(numEdges), "Total number of edges");
-		ExampleUtils.printResult(avgNodeDegree, "Average node degree");
-		ExampleUtils.printResult(maxInDegreeVertex, "Vertex with Max in-degree");
-		ExampleUtils.printResult(minInDegreeVertex, "Vertex with Min in-degree");
-		ExampleUtils.printResult(maxOutDegreeVertex, "Vertex with Max out-degree");
-		ExampleUtils.printResult(minOutDegreeVertex, "Vertex with Min out-degree");
-
-		env.execute();
-	}
-
-	@SuppressWarnings("serial")
-	private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, LongValue>, Double> {
-
-		private long numberOfVertices;
-
-		public AvgNodeDegreeMapper(long numberOfVertices) {
-			this.numberOfVertices = numberOfVertices;
-		}
-
-		public Double map(Tuple2<Long, LongValue> sumTuple) {
-			return (double) (sumTuple.f1.getValue() / numberOfVertices) ;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ProjectVertexId implements MapFunction<Tuple2<Long, LongValue>, Long> {
-		public Long map(Tuple2<Long, LongValue> value) { return value.f0; }
-	}
-
-	@Override
-	public String getDescription() {
-		return "Graph Metrics Example";
-	}
-
-	// ******************************************************************************************************************
-	// UTIL METHODS
-	// ******************************************************************************************************************
-
-	private static boolean fileOutput = false;
-
-	private static String edgesInputPath = null;
-
-	static final int NUM_VERTICES = 100;
-
-	static final long SEED = 9876;
-
-	private static boolean parseParameters(String[] args) {
-
-		if(args.length > 0) {
-			if(args.length != 1) {
-				System.err.println("Usage: GraphMetrics <input edges>");
-				return false;
-			}
-
-			fileOutput = true;
-			edgesInputPath = args[0];
-		} else {
-			System.out.println("Executing Graph Metrics example with default parameters and built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("Usage: GraphMetrics <input edges>");
-		}
-		return true;
-	}
-
-	@SuppressWarnings("serial")
-	private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			return env.readCsvFile(edgesInputPath)
-					.lineDelimiter("\n").fieldDelimiter("\t")
-					.types(Long.class, Long.class).map(
-							new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
-								public Edge<Long, NullValue> map(Tuple2<Long, Long> value) {
-									return new Edge<Long, NullValue>(value.f0, value.f1, 
-											NullValue.getInstance());
-								}
-					});
-		} else {
-			return ExampleUtils.getRandomEdges(env, NUM_VERTICES);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
deleted file mode 100644
index f70d5dc..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.simple.directed.Simplify;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.graph.library.link_analysis.HITS.Result;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
-
-/**
- * Driver for the library implementation of HITS (Hubs and Authorities).
- *
- * This example reads a simple, undirected graph from a CSV file or generates
- * an undirected RMat graph with the given scale and edge factor then calculates
- * hub and authority scores for each vertex.
- *
- * @see org.apache.flink.graph.library.link_analysis.HITS
- */
-public class HITS {
-
-	public static final int DEFAULT_ITERATIONS = 10;
-
-	public static final int DEFAULT_SCALE = 10;
-
-	public static final int DEFAULT_EDGE_FACTOR = 16;
-
-	private static void printUsage() {
-		System.out.println(WordUtils.wrap("Hyperlink-Induced Topic Search computes two interdependent" +
-			" scores for every vertex in a directed graph. A good \"hub\" links to good \"authorities\"" +
-			" and good \"authorities\" are linked from good \"hubs\".", 80));
-		System.out.println();
-		System.out.println("usage: HITS --input <csv | rmat [options]> --output <print | hash | csv [options]>");
-		System.out.println();
-		System.out.println("options:");
-		System.out.println("  --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
-		System.out.println("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
-		System.out.println();
-		System.out.println("  --output print");
-		System.out.println("  --output hash");
-		System.out.println("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]");
-	}
-
-	public static void main(String[] args) throws Exception {
-		// Set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
-
-		ParameterTool parameters = ParameterTool.fromArgs(args);
-		int iterations = parameters.getInt("iterations", DEFAULT_ITERATIONS);
-
-		DataSet hits;
-
-		switch (parameters.get("input", "")) {
-			case "csv": {
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
-						.ignoreCommentsEdges("#")
-						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter);
-
-				switch (parameters.get("type", "")) {
-					case "integer": {
-						hits = reader
-							.keyType(LongValue.class)
-							.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
-					} break;
-
-					case "string": {
-						hits = reader
-							.keyType(StringValue.class)
-							.run(new org.apache.flink.graph.library.link_analysis.HITS<StringValue, NullValue, NullValue>(iterations));
-					} break;
-
-					default:
-						printUsage();
-						return;
-				}
-				} break;
-
-			case "rmat": {
-				int scale = parameters.getInt("scale", DEFAULT_SCALE);
-				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
-				long vertexCount = 1L << scale;
-				long edgeCount = vertexCount * edgeFactor;
-
-				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-					.generate();
-
-				if (scale > 32) {
-					hits = graph
-						.run(new Simplify<LongValue, NullValue, NullValue>())
-						.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
-				} else {
-					hits = graph
-						.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
-						.run(new Simplify<IntValue, NullValue, NullValue>())
-						.run(new org.apache.flink.graph.library.link_analysis.HITS<IntValue, NullValue, NullValue>(iterations));
-				}
-				} break;
-
-			default:
-				printUsage();
-				return;
-		}
-
-		switch (parameters.get("output", "")) {
-			case "print":
-				for (Object e: hits.collect()) {
-					System.out.println(((Result)e).toVerboseString());
-				}
-				break;
-
-			case "hash":
-				System.out.println(DataSetUtils.checksumHashCode(hits));
-				break;
-
-			case "csv":
-				String filename = parameters.get("output_filename");
-
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				hits.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
-
-				env.execute();
-				break;
-			default:
-				printUsage();
-				return;
-		}
-
-		JobExecutionResult result = env.getLastJobExecutionResult();
-
-		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
deleted file mode 100644
index 2845e2d..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.text.StrBuilder;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
-/**
- * Driver for the library implementation of Jaccard Index.
- *
- * This example reads a simple, undirected graph from a CSV file or generates
- * an undirected RMat graph with the given scale and edge factor then calculates
- * all non-zero Jaccard Index similarity scores between vertices.
- *
- * @see org.apache.flink.graph.library.similarity.JaccardIndex
- */
-public class JaccardIndex {
-
-	public static final int DEFAULT_SCALE = 10;
-
-	public static final int DEFAULT_EDGE_FACTOR = 16;
-
-	public static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-	private static String getUsage(String message) {
-		return new StrBuilder()
-			.appendNewLine()
-			.appendln(WordUtils.wrap("The Jaccard Index measures the similarity between vertex" +
-				" neighborhoods and is computed as the number of shared neighbors divided by the number of" +
-				" distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are" +
-				" shared).", 80))
-			.appendNewLine()
-			.appendln(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" +
-				" number of shared neighbors, and the number of distinct neighbors.", 80))
-			.appendNewLine()
-			.appendln("usage: JaccardIndex --input <csv | rmat [options]> --output <print | hash | csv [options]>")
-			.appendNewLine()
-			.appendln("options:")
-			.appendln("  --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
-			.appendln("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
-			.appendNewLine()
-			.appendln("  --output print")
-			.appendln("  --output hash")
-			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
-			.appendNewLine()
-			.appendln("Usage error: " + message)
-			.toString();
-	}
-
-	public static void main(String[] args) throws Exception {
-		// Set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
-
-		ParameterTool parameters = ParameterTool.fromArgs(args);
-
-		int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
-
-		DataSet ji;
-
-		switch (parameters.get("input", "")) {
-			case "csv": {
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
-						.ignoreCommentsEdges("#")
-						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter);
-
-				switch (parameters.get("type", "")) {
-					case "integer": {
-						ji = reader
-							.keyType(LongValue.class)
-							.run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-					} break;
-
-					case "string": {
-						ji = reader
-							.keyType(StringValue.class)
-							.run(new org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-					} break;
-
-					default:
-						throw new ProgramParametrizationException(getUsage("invalid CSV type"));
-				}
-				} break;
-
-			case "rmat": {
-				int scale = parameters.getInt("scale", DEFAULT_SCALE);
-				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
-				long vertexCount = 1L << scale;
-				long edgeCount = vertexCount * edgeFactor;
-
-				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-					.setParallelism(little_parallelism)
-					.generate();
-
-				boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
-				if (scale > 32) {
-					ji = graph
-						.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
-							.setParallelism(little_parallelism))
-						.run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()
-							.setLittleParallelism(little_parallelism));
-				} else {
-					ji = graph
-						.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
-							.setParallelism(little_parallelism))
-						.run(new Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
-							.setParallelism(little_parallelism))
-						.run(new org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, NullValue>()
-							.setLittleParallelism(little_parallelism));
-				}
-				} break;
-
-			default:
-				throw new ProgramParametrizationException(getUsage("invalid input type"));
-		}
-
-		switch (parameters.get("output", "")) {
-			case "print":
-				for (Object e: ji.collect()) {
-					Result result = (Result)e;
-					System.out.println(result.toVerboseString());
-				}
-				break;
-
-			case "hash":
-				System.out.println(DataSetUtils.checksumHashCode(ji));
-				break;
-
-			case "csv":
-				String filename = parameters.get("output_filename");
-
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				ji.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
-
-				env.execute();
-				break;
-
-			default:
-				throw new ProgramParametrizationException(getUsage("invalid output type"));
-		}
-
-		JobExecutionResult result = env.getLastJobExecutionResult();
-
-		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
deleted file mode 100644
index 43c5eba..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
-
-/**
- * Driver for the library implementation of Triangle Listing.
- *
- * This example reads a simple directed or undirected graph from a CSV file or
- * generates an RMat graph with the given scale and edge factor then lists
- * all triangles.
- *
- * @see org.apache.flink.graph.library.clustering.directed.TriangleListing
- * @see org.apache.flink.graph.library.clustering.undirected.TriangleListing
- */
-public class TriangleListing {
-
-	public static final int DEFAULT_SCALE = 10;
-
-	public static final int DEFAULT_EDGE_FACTOR = 16;
-
-	public static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-	private static void printUsage() {
-		System.out.println(WordUtils.wrap("Lists all triangles in a graph.", 80));
-		System.out.println();
-		System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex IDs for each triangle and" +
-			" for directed graphs a bitmask indicating the presence of the six potential connecting edges.", 80));
-		System.out.println();
-		System.out.println("usage: TriangleListing --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>");
-		System.out.println();
-		System.out.println("options:");
-		System.out.println("  --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
-		System.out.println("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
-		System.out.println();
-		System.out.println("  --output print");
-		System.out.println("  --output hash");
-		System.out.println("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]");
-	}
-
-	public static void main(String[] args) throws Exception {
-		// Set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
-
-		ParameterTool parameters = ParameterTool.fromArgs(args);
-		if (! parameters.has("directed")) {
-			printUsage();
-			return;
-		}
-		boolean directedAlgorithm = parameters.getBoolean("directed");
-
-		DataSet tl;
-
-		switch (parameters.get("input", "")) {
-			case "csv": {
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
-						.ignoreCommentsEdges("#")
-						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter);
-
-				switch (parameters.get("type", "")) {
-					case "integer": {
-						Graph<LongValue, NullValue, NullValue> graph = reader
-							.keyType(LongValue.class);
-
-						if (directedAlgorithm) {
-							tl = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>());
-						} else {
-							tl = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>());
-						}
-					} break;
-
-					case "string": {
-						Graph<StringValue, NullValue, NullValue> graph = reader
-							.keyType(StringValue.class);
-
-						if (directedAlgorithm) {
-							tl = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, NullValue, NullValue>());
-						} else {
-							tl = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue, NullValue, NullValue>());
-						}
-					} break;
-
-					default:
-						printUsage();
-						return;
-				}
-
-
-			} break;
-
-			case "rmat": {
-				int scale = parameters.getInt("scale", DEFAULT_SCALE);
-				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
-				long vertexCount = 1L << scale;
-				long edgeCount = vertexCount * edgeFactor;
-
-				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-					.generate();
-
-				if (directedAlgorithm) {
-					if (scale > 32) {
-						tl = graph
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>())
-							.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>());
-					} else {
-						tl = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>())
-							.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, NullValue, NullValue>());
-					}
-				} else {
-					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
-					graph = graph
-						.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
-
-					if (scale > 32) {
-						tl = graph
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip))
-							.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>());
-					} else {
-						tl = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip))
-							.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>());
-					}
-				}
-			} break;
-
-			default:
-				printUsage();
-				return;
-		}
-
-		switch (parameters.get("output", "")) {
-			case "print":
-				if (directedAlgorithm) {
-					for (Object e: tl.collect()) {
-						org.apache.flink.graph.library.clustering.directed.TriangleListing.Result result =
-							(org.apache.flink.graph.library.clustering.directed.TriangleListing.Result) e;
-						System.out.println(result.toVerboseString());
-					}
-				} else {
-					tl.print();
-				}
-				break;
-
-			case "hash":
-				System.out.println(DataSetUtils.checksumHashCode(tl));
-				break;
-
-			case "csv":
-				String filename = parameters.get("output_filename");
-
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				tl.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
-
-				env.execute();
-				break;
-			default:
-				printUsage();
-				return;
-		}
-
-		JobExecutionResult result = env.getLastJobExecutionResult();
-
-		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java
deleted file mode 100644
index b1bc831..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples.utils;
-
-import java.io.PrintStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-
-public class ExampleUtils {
-
-	@SuppressWarnings({ "serial", "unchecked", "rawtypes" })
-	public static void printResult(DataSet set, String msg) {
-		set.output(new PrintingOutputFormatWithMessage(msg) {
-		});
-	}
-
-	public static class PrintingOutputFormatWithMessage<T> implements
-			OutputFormat<T> {
-
-		private static final long serialVersionUID = 1L;
-
-		private transient PrintStream stream;
-
-		private transient String prefix;
-
-		private String message;
-
-		// --------------------------------------------------------------------------------------------
-
-		/**
-		 * Instantiates a printing output format that prints to standard out.
-		 */
-		public PrintingOutputFormatWithMessage() {
-		}
-
-		public PrintingOutputFormatWithMessage(String msg) {
-			this.message = msg;
-		}
-
-		@Override
-		public void open(int taskNumber, int numTasks) {
-			// get the target stream
-			this.stream = System.out;
-
-			// set the prefix to message
-			this.prefix = message + ": ";
-		}
-
-		@Override
-		public void writeRecord(T record) {
-			if (this.prefix != null) {
-				this.stream.println(this.prefix + record.toString());
-			} else {
-				this.stream.println(record.toString());
-			}
-		}
-
-		@Override
-		public void close() {
-			this.stream = null;
-			this.prefix = null;
-		}
-
-		@Override
-		public String toString() {
-			return "Print to System.out";
-		}
-
-		@Override
-		public void configure(Configuration parameters) {
-		}
-	}
-
-	@SuppressWarnings("serial")
-	public static DataSet<Vertex<Long, NullValue>> getVertexIds(
-			ExecutionEnvironment env, final long numVertices) {
-		return env.generateSequence(1, numVertices).map(
-				new MapFunction<Long, Vertex<Long, NullValue>>() {
-					public Vertex<Long, NullValue> map(Long l) {
-						return new Vertex<Long, NullValue>(l, NullValue
-								.getInstance());
-					}
-				});
-	}
-
-	@SuppressWarnings("serial")
-	public static DataSet<Edge<Long, NullValue>> getRandomEdges(
-			ExecutionEnvironment env, final long numVertices) {
-		return env.generateSequence(1, numVertices).flatMap(
-				new FlatMapFunction<Long, Edge<Long, NullValue>>() {
-					@Override
-					public void flatMap(Long key, Collector<Edge<Long, NullValue>> out) throws Exception {
-						int numOutEdges = (int) (Math.random() * (numVertices / 2));
-						for (int i = 0; i < numOutEdges; i++) {
-							long target = (long) (Math.random() * numVertices) + 1;
-							out.collect(new Edge<Long, NullValue>(key, target,
-									NullValue.getInstance()));
-						}
-					}
-				});
-	}
-
-	public static DataSet<Vertex<Long, Double>> getLongDoubleVertexData(
-			ExecutionEnvironment env) {
-		List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>();
-		vertices.add(new Vertex<Long, Double>(1L, 1.0));
-		vertices.add(new Vertex<Long, Double>(2L, 2.0));
-		vertices.add(new Vertex<Long, Double>(3L, 3.0));
-		vertices.add(new Vertex<Long, Double>(4L, 4.0));
-		vertices.add(new Vertex<Long, Double>(5L, 5.0));
-
-		return env.fromCollection(vertices);
-	}
-
-	public static DataSet<Edge<Long, Double>> getLongDoubleEdgeData(
-			ExecutionEnvironment env) {
-		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
-		edges.add(new Edge<Long, Double>(1L, 2L, 12.0));
-		edges.add(new Edge<Long, Double>(1L, 3L, 13.0));
-		edges.add(new Edge<Long, Double>(2L, 3L, 23.0));
-		edges.add(new Edge<Long, Double>(3L, 4L, 34.0));
-		edges.add(new Edge<Long, Double>(3L, 5L, 35.0));
-		edges.add(new Edge<Long, Double>(4L, 5L, 45.0));
-		edges.add(new Edge<Long, Double>(5L, 1L, 51.0));
-
-		return env.fromCollection(edges);
-	}
-
-	/**
-	 * Private constructor to prevent instantiation.
-	 */
-	private ExampleUtils() {
-		throw new RuntimeException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
deleted file mode 100644
index ebf43d4..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.examples
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.types.NullValue
-import org.apache.flink.graph.Edge
-import org.apache.flink.util.Collector
-
-/**
- * This example illustrates how to use Gelly metrics methods and get simple statistics
- * from the input graph.  
- * 
- * The program creates a random graph and computes and prints
- * the following metrics:
- * - number of vertices
- * - number of edges
- * - average node degree
- * - the vertex ids with the max/min in- and out-degrees
- *
- * The input file is expected to contain one edge per line,
- * with long IDs and no values, in the following format:
- * {{{
- *   <sourceVertexID>\t<targetVertexID>
- * }}}
- * If no arguments are provided, the example runs with a random graph of 100 vertices.
- *
- */
-object GraphMetrics {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    /** create the graph **/
-    val graph: Graph[Long, NullValue, NullValue] = Graph.fromDataSet(getEdgeDataSet(env), env)
-
-    /** get the number of vertices **/
-    val numVertices = graph.numberOfVertices
-
-    /** get the number of edges **/
-    val numEdges = graph.numberOfEdges
-
-    /** compute the average node degree **/
-    val verticesWithDegrees = graph.getDegrees
-    val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2.getValue / numVertices).toDouble)
-
-    /** find the vertex with the maximum in-degree **/
-    val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1)
-
-    /** find the vertex with the minimum in-degree **/
-    val minInDegreeVertex = graph.inDegrees.min(1).map(in => in._1)
-
-    /** find the vertex with the maximum out-degree **/
-    val maxOutDegreeVertex = graph.outDegrees.max(1).map(in => in._1)
-
-    /** find the vertex with the minimum out-degree **/
-    val minOutDegreeVertex = graph.outDegrees.min(1).map(in => in._1)
-
-    /** print the results **/
-    env.fromElements(numVertices).printOnTaskManager("Total number of vertices")
-    env.fromElements(numEdges).printOnTaskManager("Total number of edges")
-    avgDegree.printOnTaskManager("Average node degree")
-    maxInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
-    minInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
-    maxOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
-    minOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
-
-  }
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      fileOutput = true
-      if (args.length == 1) {
-        edgesPath = args(0)
-        true
-      } else {
-        System.err.println("Usage: GraphMetrics <edges path>")
-        false
-      }
-    } else {
-      System.out.println("Executing GraphMetrics example with built-in default data.")
-      System.out.println("  Provide parameters to read input data from a file.")
-      System.out.println("  Usage: GraphMetrics <edges path>")
-      true
-    }
-  }
-
-  private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
-    if (fileOutput) {
-      env.readCsvFile[(Long, Long)](
-        edgesPath,
-        fieldDelimiter = "\t").map(
-        in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance()))
-    } else {
-      env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]](
-        (key: Long, out: Collector[Edge[Long, NullValue]]) => {
-          val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
-          for ( i <- 0 to numOutEdges ) {
-            val target: Long = ((Math.random() * numVertices) + 1).toLong
-            new Edge[Long, NullValue](key, target, NullValue.getInstance())
-          }
-      })
-    }
-  }
-
-  private var fileOutput: Boolean = false
-  private var edgesPath: String = null
-  private var outputPath: String = null
-  private val numVertices = 100
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
index 167e31c..b3e1e30 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -367,6 +367,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		/**
+		 * Get the average degree.
+		 *
+		 * @return average degree
+		 */
+		public float getAverageDegree() {
+			return edgeCount / (float)vertexCount;
+		}
+
+		/**
 		 * Get the number of triangle triplets.
 		 *
 		 * @return number of triangle triplets
@@ -453,6 +462,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 			return "vertex count: " + nf.format(vertexCount)
 				+ "; edge count: " + nf.format(edgeCount)
+				+ "; average degree: " + nf.format(getAverageDegree())
 				+ "; triangle triplet count: " + nf.format(triangleTripletCount)
 				+ "; rectangle triplet count: " + nf.format(rectangleTripletCount)
 				+ "; triplet count: " + nf.format(tripletCount)

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
index 22f7733..909eea5 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
@@ -228,6 +228,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		/**
+		 * Get the average degree.
+		 *
+		 * @return average degree
+		 */
+		public float getAverageDegree() {
+			return edgeCount / (float)vertexCount;
+		}
+
+		/**
 		 * Get the number of triplets.
 		 *
 		 * @return number of triplets
@@ -278,6 +287,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 			return "vertex count: " + nf.format(vertexCount)
 				+ "; edge count: " + nf.format(edgeCount)
+				+ "; average degree: " + nf.format(getAverageDegree())
 				+ "; triplet count: " + nf.format(tripletCount)
 				+ "; maximum degree: " + nf.format(maximumDegree)
 				+ "; maximum out degree: " + nf.format(maximumOutDegree)

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
index 1d5b664..6bce42c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -329,6 +329,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		/**
+		 * Get the average degree.
+		 *
+		 * @return average degree
+		 */
+		public float getAverageDegree() {
+			return edgeCount / (float)vertexCount;
+		}
+
+		/**
 		 * Get the number of triangle triplets.
 		 *
 		 * @return number of triangle triplets
@@ -397,6 +406,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 			return "vertex count: " + nf.format(vertexCount)
 				+ "; edge count: " + nf.format(edgeCount)
+				+ "; average degree: " + nf.format(getAverageDegree())
 				+ "; triangle triplet count: " + nf.format(triangleTripletCount)
 				+ "; rectangle triplet count: " + nf.format(rectangleTripletCount)
 				+ "; triplet count: " + nf.format(tripletCount)

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
index d04fa7b..8012605 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -230,6 +230,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		/**
+		 * Get the average degree.
+		 *
+		 * @return average degree
+		 */
+		public float getAverageDegree() {
+			return edgeCount / (float)vertexCount;
+		}
+
+		/**
 		 * Get the number of triplets.
 		 *
 		 * @return number of triplets
@@ -262,6 +271,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 			return "vertex count: " + nf.format(vertexCount)
 				+ "; edge count: " + nf.format(edgeCount)
+				+ "; average degree: " + nf.format(getAverageDegree())
 				+ "; triplet count: " + nf.format(tripletCount)
 				+ "; maximum degree: " + nf.format(maximumDegree)
 				+ "; maximum triplets: " + nf.format(maximumTriplets);


[2/2] flink git commit: [FLINK-4204] [gelly] Clean up gelly-examples

Posted by gr...@apache.org.
[FLINK-4204] [gelly] Clean up gelly-examples

Moves drivers into separate package. Adds default main class to print
usage listing included classes. Includes documentation for running
Gelly examples.

This closes #2670


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4f9f0d7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4f9f0d7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4f9f0d7

Branch: refs/heads/master
Commit: c4f9f0d78d65a7ce7290820ae9fb2919c9116e57
Parents: c4783c8
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Aug 24 11:32:43 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Oct 24 12:14:05 2016 -0400

----------------------------------------------------------------------
 docs/dev/libs/gelly/index.md                    |  61 +++-
 flink-libraries/flink-gelly-examples/pom.xml    |  13 +
 .../main/java/org/apache/flink/graph/Usage.java |  60 ++++
 .../apache/flink/graph/driver/GraphMetrics.java | 232 -------------
 .../graph/drivers/ClusteringCoefficient.java    | 343 +++++++++++++++++++
 .../apache/flink/graph/drivers/Graph500.java    | 141 ++++++++
 .../flink/graph/drivers/GraphMetrics.java       | 234 +++++++++++++
 .../org/apache/flink/graph/drivers/HITS.java    | 187 ++++++++++
 .../flink/graph/drivers/JaccardIndex.java       | 222 ++++++++++++
 .../flink/graph/drivers/TriangleListing.java    | 251 ++++++++++++++
 .../graph/examples/ClusteringCoefficient.java   | 326 ------------------
 .../apache/flink/graph/examples/Graph500.java   | 129 -------
 .../flink/graph/examples/GraphMetrics.java      | 171 ---------
 .../org/apache/flink/graph/examples/HITS.java   | 185 ----------
 .../flink/graph/examples/JaccardIndex.java      | 208 -----------
 .../flink/graph/examples/TriangleListing.java   | 230 -------------
 .../graph/examples/utils/ExampleUtils.java      | 162 ---------
 .../graph/scala/examples/GraphMetrics.scala     | 129 -------
 .../library/metric/directed/EdgeMetrics.java    |  10 +
 .../library/metric/directed/VertexMetrics.java  |  10 +
 .../library/metric/undirected/EdgeMetrics.java  |  10 +
 .../metric/undirected/VertexMetrics.java        |  10 +
 22 files changed, 1551 insertions(+), 1773 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/docs/dev/libs/gelly/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/index.md b/docs/dev/libs/gelly/index.md
index 2eeec2c..db7073f 100644
--- a/docs/dev/libs/gelly/index.md
+++ b/docs/dev/libs/gelly/index.md
@@ -64,6 +64,65 @@ Add the following dependency to your `pom.xml` to use Gelly.
 
 Note that Gelly is currently not part of the binary distribution. See linking with it for cluster execution [here]({{ site.baseurl }}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
 
-The remaining sections provide a description of available methods and present several examples of how to use Gelly and how to mix it with the Flink DataSet API. After reading this guide, you might also want to check the {% gh_link /flink-libraries/flink-gelly-examples/ "Gelly examples" %}.
+The remaining sections provide a description of available methods and present several examples of how to use Gelly and how to mix it with the Flink DataSet API.
+
+Running Gelly Examples
+----------------------
+
+The Gelly library and examples jars are provided in the [Flink distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads")
+in the folder **opt/lib/gelly** (for versions older than Flink 1.2 these can be manually downloaded from
+[Maven Central](http://search.maven.org/#search|ga|1|flink%20gelly).
+
+To run the Gelly examples the **flink-gelly** (for Java) or **flink-gelly-scala** (for Scala) jar must be copied to
+Flink's **lib** directory.
+
+~~~bash
+cp opt/lib/gelly/flink-gelly_*.jar lib/
+cp opt/lib/gelly/flink-gelly-scala_*.jar lib/
+~~~
+
+Gelly's examples jar includes both drivers for the library methods as well as additional example algorithms. After
+configuring and starting the cluster, list the available algorithm classes:
+
+~~~bash
+./bin/start-cluster.sh
+./bin/flink run opt/lib/gelly/flink-gelly-examples_*.jar
+~~~
+
+The Gelly drivers can generate [RMat](http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf) graph data or read the
+edge list from a CSV file. Each node in a cluster must have access to the input file. Calculate graph metrics on a
+directed generated graph:
+
+~~~bash
+./bin/flink run -c org.apache.flink.graph.drivers.GraphMetrics opt/lib/gelly/flink-gelly-examples_*.jar \
+    --directed true --input rmat
+~~~
+
+The size of the graph is adjusted by the *\-\-scale* and *\-\-edge_factor* parameters. The
+[library generator](./graph_generators.html#rmat-graph) provides access to additional configuration to adjust the
+power-law skew and random noise.
+
+Sample social network data is provided by the [Stanford Network Analysis Project](http://snap.stanford.edu/data/index.html).
+The [com-lj](http://snap.stanford.edu/data/bigdata/communities/com-lj.ungraph.txt.gz) data set is a good starter size.
+Run a few algorithms and monitor the job progress in Flink's Web UI:
+
+~~~bash
+wget -O - http://snap.stanford.edu/data/bigdata/communities/com-lj.ungraph.txt.gz | gunzip -c > com-lj.ungraph.txt
+
+./bin/flink run -q -c org.apache.flink.graph.drivers.GraphMetrics opt/lib/gelly/flink-gelly-examples_*.jar \
+    --directed true --input csv --type integer --input_filename com-lj.ungraph.txt --input_field_delimiter '\t'
+
+./bin/flink run -q -c org.apache.flink.graph.drivers.ClusteringCoefficient opt/lib/gelly/flink-gelly-examples_*.jar \
+    --directed true --input csv --type integer --input_filename com-lj.ungraph.txt  --input_field_delimiter '\t' \
+    --output hash
+
+./bin/flink run -q -c org.apache.flink.graph.drivers.JaccardIndex opt/lib/gelly/flink-gelly-examples_*.jar \
+    --input csv --type integer --simplify true --input_filename com-lj.ungraph.txt --input_field_delimiter '\t' \
+    --output hash
+~~~
+
+Please submit feature requests and report issues on the user [mailing list](https://flink.apache.org/community.html#mailing-lists)
+or [Flink Jira](https://issues.apache.org/jira/browse/FLINK). We welcome suggestions for new algorithms and features as
+well as [code contributions](https://flink.apache.org/contribute-code.html).
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/pom.xml b/flink-libraries/flink-gelly-examples/pom.xml
index 80da0ff..9b90b04 100644
--- a/flink-libraries/flink-gelly-examples/pom.xml
+++ b/flink-libraries/flink-gelly-examples/pom.xml
@@ -143,6 +143,19 @@
 				</configuration>
 			</plugin>
 
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<version>2.5</version>
+				<configuration>
+					<archive>
+						<manifestEntries>
+							<Main-Class>org.apache.flink.graph.Usage</Main-Class>
+						</manifestEntries>
+					</archive>
+				</configuration>
+			</plugin>
+
 			<!-- Adding scala source directories to build path -->
 			<plugin>
 				<groupId>org.codehaus.mojo</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
new file mode 100644
index 0000000..9d8f116
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+/**
+ * This default main class prints usage listing available classes.
+ */
+public class Usage {
+
+	private static final Class[] DRIVERS = new Class[]{
+		org.apache.flink.graph.drivers.ClusteringCoefficient.class,
+		org.apache.flink.graph.drivers.Graph500.class,
+		org.apache.flink.graph.drivers.GraphMetrics.class,
+		org.apache.flink.graph.drivers.HITS.class,
+		org.apache.flink.graph.drivers.JaccardIndex.class,
+		org.apache.flink.graph.drivers.TriangleListing.class,
+	};
+
+	private static final Class[] EXAMPLES = new Class[]{
+		org.apache.flink.graph.examples.ConnectedComponents.class,
+		org.apache.flink.graph.examples.EuclideanGraphWeighing.class,
+		org.apache.flink.graph.examples.GSASingleSourceShortestPaths.class,
+		org.apache.flink.graph.examples.IncrementalSSSP.class,
+		org.apache.flink.graph.examples.MusicProfiles.class,
+		org.apache.flink.graph.examples.PregelSSSP.class,
+		org.apache.flink.graph.examples.SingleSourceShortestPaths.class,
+		org.apache.flink.graph.scala.examples.ConnectedComponents.class,
+		org.apache.flink.graph.scala.examples.GSASingleSourceShortestPaths.class,
+		org.apache.flink.graph.scala.examples.SingleSourceShortestPaths.class,
+	};
+
+	public static void main(String[] args) throws Exception {
+		System.out.println("Driver classes call algorithms from the Gelly library:");
+		for (Class cls : DRIVERS) {
+			System.out.println("  " + cls.getName());
+		}
+
+		System.out.println("");
+		System.out.println("Example classes illustrate Gelly APIs or alternative algorithms:");
+		for (Class cls : EXAMPLES) {
+			System.out.println("  " + cls.getName());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
deleted file mode 100644
index 79c5f80..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.driver;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAnalytic;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
-
-/**
- * Computes vertex and edge metrics on a directed or undirected graph.
- *
- * @see org.apache.flink.graph.library.metric.directed.EdgeMetrics
- * @see org.apache.flink.graph.library.metric.directed.VertexMetrics
- * @see org.apache.flink.graph.library.metric.undirected.EdgeMetrics
- * @see org.apache.flink.graph.library.metric.undirected.VertexMetrics
- */
-public class GraphMetrics {
-
-	public static final int DEFAULT_SCALE = 10;
-
-	public static final int DEFAULT_EDGE_FACTOR = 16;
-
-	public static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-	private static void printUsage() {
-		System.out.println(WordUtils.wrap("Computes vertex and edge metrics on a directed or undirected graph.", 80));
-		System.out.println();
-		System.out.println("usage: GraphMetrics --directed <true | false> --input <csv | rmat [options]>");
-		System.out.println();
-		System.out.println("options:");
-		System.out.println("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
-		System.out.println("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
-	}
-
-	public static void main(String[] args) throws Exception {
-		// Set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
-
-		ParameterTool parameters = ParameterTool.fromArgs(args);
-		if (! parameters.has("directed")) {
-			printUsage();
-			return;
-		}
-		boolean directedAlgorithm = parameters.getBoolean("directed");
-
-		GraphAnalytic vm;
-		GraphAnalytic em;
-
-		switch (parameters.get("input", "")) {
-			case "csv": {
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
-						.ignoreCommentsEdges("#")
-						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter);
-
-				switch (parameters.get("type", "")) {
-					case "integer": {
-						Graph<LongValue, NullValue, NullValue> graph = reader
-							.keyType(LongValue.class);
-
-						if (directedAlgorithm) {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
-							}
-
-							vm = graph
-								.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, NullValue, NullValue>());
-							em = graph
-								.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>());
-						} else {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
-							}
-
-							vm = graph
-								.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, NullValue, NullValue>());
-							em = graph
-								.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>());
-						}
-					} break;
-
-					case "string": {
-						Graph<StringValue, NullValue, NullValue> graph = reader
-							.keyType(StringValue.class);
-
-						if (directedAlgorithm) {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
-							}
-
-							vm = graph
-								.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<StringValue, NullValue, NullValue>());
-							em = graph
-								.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<StringValue, NullValue, NullValue>());
-						} else {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false));
-							}
-
-							vm = graph
-								.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<StringValue, NullValue, NullValue>());
-							em = graph
-								.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<StringValue, NullValue, NullValue>());
-						}
-					} break;
-
-					default:
-						printUsage();
-						return;
-				}
-				} break;
-
-			case "rmat": {
-				int scale = parameters.getInt("scale", DEFAULT_SCALE);
-				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
-				long vertexCount = 1L << scale;
-				long edgeCount = vertexCount * edgeFactor;
-
-
-				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-					.generate();
-
-				if (directedAlgorithm) {
-					if (scale > 32) {
-						Graph<LongValue, NullValue, NullValue> newGraph = graph
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
-
-						vm = newGraph
-							.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, NullValue, NullValue>());
-						em = newGraph
-							.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>());
-					} else {
-						Graph<IntValue, NullValue, NullValue> newGraph = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>());
-
-						vm = newGraph
-							.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<IntValue, NullValue, NullValue>());
-						em = newGraph
-							.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<IntValue, NullValue, NullValue>());
-					}
-				} else {
-					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
-					if (scale > 32) {
-						Graph<LongValue, NullValue, NullValue> newGraph = graph
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
-
-						vm = newGraph
-							.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, NullValue, NullValue>());
-						em = newGraph
-							.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>());
-					} else {
-						Graph<IntValue, NullValue, NullValue> newGraph = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip));
-
-						vm = newGraph
-							.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<IntValue, NullValue, NullValue>());
-						em = newGraph
-							.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<IntValue, NullValue, NullValue>());
-					}
-				}
-				} break;
-
-			default:
-				printUsage();
-				return;
-		}
-
-		env.execute("Graph Metrics");
-
-		System.out.print("Vertex metrics:\n  ");
-		System.out.println(vm.getResult().toString().replace(";", "\n "));
-		System.out.print("\nEdge metrics:\n  ");
-		System.out.println(em.getResult().toString().replace(";", "\n "));
-
-		JobExecutionResult result = env.getLastJobExecutionResult();
-
-		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println("\nExecution runtime: " + nf.format(result.getNetRuntime()) + " ms");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
new file mode 100644
index 0000000..18b0406
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalytic;
+import org.apache.flink.graph.GraphCsvReader;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+
+import java.text.NumberFormat;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Driver for the library implementations of Global and Local Clustering Coefficient.
+ *
+ * This example reads a simple directed or undirected graph from a CSV file or
+ * generates an RMat graph with the given scale and edge factor then calculates
+ * the local clustering coefficient for each vertex and the global clustering
+ * coefficient for the graph.
+ *
+ * @see org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient
+ * @see org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient
+ * @see org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient
+ * @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient
+ */
+public class ClusteringCoefficient {
+
+	private static final int DEFAULT_SCALE = 10;
+
+	private static final int DEFAULT_EDGE_FACTOR = 16;
+
+	private static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+	private static String getUsage(String message) {
+		return new StrBuilder()
+			.appendNewLine()
+			.appendln(WordUtils.wrap("The local clustering coefficient measures the connectedness of each" +
+				" vertex's neighborhood and the global clustering coefficient measures the connectedness of the graph." +
+				" Scores range from 0.0 (no edges between neighbors or vertices) to 1.0 (neighborhood or graph" +
+				" is a clique).", 80))
+			.appendNewLine()
+			.appendln(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" +
+				" the vertex, and the number of edges between vertex neighbors.", 80))
+			.appendNewLine()
+			.appendln("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+			.appendNewLine()
+			.appendln("options:")
+			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
+			.appendln("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
+			.appendNewLine()
+			.appendln("  --output print")
+			.appendln("  --output hash")
+			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+			.toString();
+	}
+
+	public static void main(String[] args) throws Exception {
+		// Set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+
+		ParameterTool parameters = ParameterTool.fromArgs(args);
+
+		if (! parameters.has("directed")) {
+			throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
+		}
+		boolean directedAlgorithm = parameters.getBoolean("directed");
+
+		int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
+
+		// global and local clustering coefficient results
+		GraphAnalytic gcc;
+		GraphAnalytic acc;
+		DataSet lcc;
+
+		switch (parameters.get("input", "")) {
+			case "csv": {
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				GraphCsvReader reader = Graph
+					.fromCsvReader(parameters.get("input_filename"), env)
+						.ignoreCommentsEdges("#")
+						.lineDelimiterEdges(lineDelimiter)
+						.fieldDelimiterEdges(fieldDelimiter);
+
+				switch (parameters.get("type", "")) {
+					case "integer": {
+						Graph<LongValue, NullValue, NullValue> graph = reader
+							.keyType(LongValue.class);
+
+						if (directedAlgorithm) {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+							}
+
+							gcc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+							acc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+							lcc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+						} else {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
+							}
+
+							gcc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+							acc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+							lcc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+						}
+					} break;
+
+					case "string": {
+						Graph<StringValue, NullValue, NullValue> graph = reader
+							.keyType(StringValue.class);
+
+						if (directedAlgorithm) {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
+							}
+
+							gcc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+							acc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+							lcc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+						} else {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false));
+							}
+
+							gcc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+							acc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+							lcc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+						}
+					} break;
+
+					default:
+						throw new ProgramParametrizationException(getUsage("invalid CSV type"));
+				}
+			} break;
+
+			case "rmat": {
+				int scale = parameters.getInt("scale", DEFAULT_SCALE);
+				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
+
+				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+				long vertexCount = 1L << scale;
+				long edgeCount = vertexCount * edgeFactor;
+
+				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+					.setParallelism(little_parallelism)
+					.generate();
+
+				if (directedAlgorithm) {
+					if (scale > 32) {
+						Graph<LongValue, NullValue, NullValue> newGraph = graph
+							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
+								.setParallelism(little_parallelism));
+
+						gcc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+						acc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+						lcc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
+								.setIncludeZeroDegreeVertices(false)
+								.setLittleParallelism(little_parallelism));
+					} else {
+						Graph<IntValue, NullValue, NullValue> newGraph = graph
+							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
+								.setParallelism(little_parallelism))
+							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>()
+								.setParallelism(little_parallelism));
+
+						gcc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+						acc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+						lcc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
+								.setIncludeZeroDegreeVertices(false)
+								.setLittleParallelism(little_parallelism));
+					}
+				} else {
+					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+
+					if (scale > 32) {
+						Graph<LongValue, NullValue, NullValue> newGraph = graph
+							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
+								.setParallelism(little_parallelism));
+
+						gcc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+						acc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+						lcc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
+								.setIncludeZeroDegreeVertices(false)
+								.setLittleParallelism(little_parallelism));
+					} else {
+						Graph<IntValue, NullValue, NullValue> newGraph = graph
+							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
+								.setParallelism(little_parallelism))
+							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
+								.setParallelism(little_parallelism));
+
+						gcc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+						acc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+						lcc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
+								.setIncludeZeroDegreeVertices(false)
+								.setLittleParallelism(little_parallelism));
+					}
+				}
+			} break;
+
+			default:
+				throw new ProgramParametrizationException(getUsage("invalid input type"));
+		}
+
+		switch (parameters.get("output", "")) {
+			case "print":
+				if (directedAlgorithm) {
+					for (Object e: lcc.collect()) {
+						org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result result =
+							(org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result)e;
+						System.out.println(result.toVerboseString());
+					}
+				} else {
+					for (Object e: lcc.collect()) {
+						org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result result =
+							(org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result)e;
+						System.out.println(result.toVerboseString());
+					}
+				}
+				break;
+
+			case "hash":
+				System.out.println(DataSetUtils.checksumHashCode(lcc));
+				break;
+
+			case "csv":
+				String filename = parameters.get("output_filename");
+
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				lcc.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
+
+				env.execute("Clustering Coefficient");
+				break;
+
+			default:
+				throw new ProgramParametrizationException(getUsage("invalid output type"));
+		}
+
+		System.out.println(gcc.getResult());
+		System.out.println(acc.getResult());
+
+		JobExecutionResult result = env.getLastJobExecutionResult();
+
+		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
new file mode 100644
index 0000000..8f9a54a
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Generate an RMat graph for Graph 500.
+ *
+ * Note that this does not yet implement permutation of vertex labels or edges.
+ *
+ * @see <a href="http://www.graph500.org/specifications">Graph 500</a>
+ */
+public class Graph500 {
+
+	private static final int DEFAULT_SCALE = 10;
+
+	private static final int DEFAULT_EDGE_FACTOR = 16;
+
+	private static final boolean DEFAULT_SIMPLIFY = false;
+
+	private static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+	private static String getUsage(String message) {
+		return new StrBuilder()
+			.appendNewLine()
+			.appendln("A Graph500 generator using the Recursive Matrix (RMat) graph generator.")
+			.appendNewLine()
+			.appendln(WordUtils.wrap("The graph matrix contains 2^scale vertices although not every vertex will" +
+				" be represented in an edge. The number of edges is edge_factor * 2^scale edges" +
+				" although some edges may be duplicates.", 80))
+			.appendNewLine()
+			.appendln("Note: this does not yet implement permutation of vertex labels or edges.")
+			.appendNewLine()
+			.appendln("  --output print")
+			.appendln("  --output hash")
+			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+			.appendNewLine()
+			.appendln("Usage error: " + message)
+			.toString();
+	}
+
+	public static void main(String[] args) throws Exception {
+		// Set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+
+		ParameterTool parameters = ParameterTool.fromArgs(args);
+
+		// Generate RMat graph
+		int scale = parameters.getInt("scale", DEFAULT_SCALE);
+		int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
+
+		RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+		long vertexCount = 1L << scale;
+		long edgeCount = vertexCount * edgeFactor;
+
+		boolean simplify = parameters.getBoolean("simplify", DEFAULT_SIMPLIFY);
+		boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+
+		Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+			.generate();
+
+		if (simplify) {
+			graph = graph.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+		}
+
+		DataSet<Tuple2<LongValue,LongValue>> edges = graph
+			.getEdges()
+			.project(0, 1);
+
+		// Print, hash, or write RMat graph to disk
+		switch (parameters.get("output", "")) {
+		case "print":
+			edges.print();
+			break;
+
+		case "hash":
+			System.out.println(DataSetUtils.checksumHashCode(edges));
+			break;
+
+		case "csv":
+			String filename = parameters.get("filename");
+
+			String lineDelimiter = StringEscapeUtils.unescapeJava(
+				parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+			String fieldDelimiter = StringEscapeUtils.unescapeJava(
+				parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+			edges.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
+
+			env.execute("Graph500");
+			break;
+		default:
+			throw new ProgramParametrizationException(getUsage("invalid output type"));
+		}
+
+		JobExecutionResult result = env.getLastJobExecutionResult();
+
+		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
new file mode 100644
index 0000000..4fb11c3
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalytic;
+import org.apache.flink.graph.GraphCsvReader;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Computes vertex and edge metrics on a directed or undirected graph.
+ *
+ * @see org.apache.flink.graph.library.metric.directed.EdgeMetrics
+ * @see org.apache.flink.graph.library.metric.directed.VertexMetrics
+ * @see org.apache.flink.graph.library.metric.undirected.EdgeMetrics
+ * @see org.apache.flink.graph.library.metric.undirected.VertexMetrics
+ */
+public class GraphMetrics {
+
+	private static final int DEFAULT_SCALE = 10;
+
+	private static final int DEFAULT_EDGE_FACTOR = 16;
+
+	private static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+	private static String getUsage(String message) {
+		return new StrBuilder()
+			.appendNewLine()
+			.appendln(WordUtils.wrap("Computes vertex and edge metrics on a directed or undirected graph.", 80))
+			.appendNewLine()
+			.appendln("usage: GraphMetrics --directed <true | false> --input <csv | rmat [options]>")
+			.appendNewLine()
+			.appendln("options:")
+			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
+			.appendln("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
+			.toString();
+	}
+
+	public static void main(String[] args) throws Exception {
+		// Set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+
+		ParameterTool parameters = ParameterTool.fromArgs(args);
+		if (! parameters.has("directed")) {
+			throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
+		}
+		boolean directedAlgorithm = parameters.getBoolean("directed");
+
+		GraphAnalytic vm;
+		GraphAnalytic em;
+
+		switch (parameters.get("input", "")) {
+			case "csv": {
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				GraphCsvReader reader = Graph
+					.fromCsvReader(parameters.get("input_filename"), env)
+						.ignoreCommentsEdges("#")
+						.lineDelimiterEdges(lineDelimiter)
+						.fieldDelimiterEdges(fieldDelimiter);
+
+				switch (parameters.get("type", "")) {
+					case "integer": {
+						Graph<LongValue, NullValue, NullValue> graph = reader
+							.keyType(LongValue.class);
+
+						if (directedAlgorithm) {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+							}
+
+							vm = graph
+								.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, NullValue, NullValue>());
+							em = graph
+								.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>());
+						} else {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
+							}
+
+							vm = graph
+								.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, NullValue, NullValue>());
+							em = graph
+								.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>());
+						}
+					} break;
+
+					case "string": {
+						Graph<StringValue, NullValue, NullValue> graph = reader
+							.keyType(StringValue.class);
+
+						if (directedAlgorithm) {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
+							}
+
+							vm = graph
+								.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<StringValue, NullValue, NullValue>());
+							em = graph
+								.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<StringValue, NullValue, NullValue>());
+						} else {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false));
+							}
+
+							vm = graph
+								.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<StringValue, NullValue, NullValue>());
+							em = graph
+								.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<StringValue, NullValue, NullValue>());
+						}
+					} break;
+
+					default:
+						throw new ProgramParametrizationException(getUsage("invalid CSV type"));
+				}
+				} break;
+
+			case "rmat": {
+				int scale = parameters.getInt("scale", DEFAULT_SCALE);
+				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
+
+				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+				long vertexCount = 1L << scale;
+				long edgeCount = vertexCount * edgeFactor;
+
+
+				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+					.generate();
+
+				if (directedAlgorithm) {
+					if (scale > 32) {
+						Graph<LongValue, NullValue, NullValue> newGraph = graph
+							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+
+						vm = newGraph
+							.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, NullValue, NullValue>());
+						em = newGraph
+							.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>());
+					} else {
+						Graph<IntValue, NullValue, NullValue> newGraph = graph
+							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
+							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>());
+
+						vm = newGraph
+							.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<IntValue, NullValue, NullValue>());
+						em = newGraph
+							.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<IntValue, NullValue, NullValue>());
+					}
+				} else {
+					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+
+					if (scale > 32) {
+						Graph<LongValue, NullValue, NullValue> newGraph = graph
+							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+
+						vm = newGraph
+							.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, NullValue, NullValue>());
+						em = newGraph
+							.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>());
+					} else {
+						Graph<IntValue, NullValue, NullValue> newGraph = graph
+							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
+							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip));
+
+						vm = newGraph
+							.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<IntValue, NullValue, NullValue>());
+						em = newGraph
+							.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<IntValue, NullValue, NullValue>());
+					}
+				}
+				} break;
+
+			default:
+				throw new ProgramParametrizationException(getUsage("invalid input type"));
+		}
+
+		env.execute("Graph Metrics");
+
+		System.out.print("Vertex metrics:\n  ");
+		System.out.println(vm.getResult().toString().replace(";", "\n "));
+		System.out.print("\nEdge metrics:\n  ");
+		System.out.println(em.getResult().toString().replace(";", "\n "));
+
+		JobExecutionResult result = env.getLastJobExecutionResult();
+
+		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println("\nExecution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
new file mode 100644
index 0000000..e0a233a
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphCsvReader;
+import org.apache.flink.graph.asm.simple.directed.Simplify;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.graph.library.link_analysis.HITS.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Driver for the library implementation of HITS (Hubs and Authorities).
+ *
+ * This example reads a simple, undirected graph from a CSV file or generates
+ * an undirected RMat graph with the given scale and edge factor then calculates
+ * hub and authority scores for each vertex.
+ *
+ * @see org.apache.flink.graph.library.link_analysis.HITS
+ */
+public class HITS {
+
+	private static final int DEFAULT_ITERATIONS = 10;
+
+	private static final int DEFAULT_SCALE = 10;
+
+	private static final int DEFAULT_EDGE_FACTOR = 16;
+
+	private static String getUsage(String message) {
+		return new StrBuilder()
+			.appendNewLine()
+			.appendln(WordUtils.wrap("Hyperlink-Induced Topic Search computes two interdependent" +
+				" scores for every vertex in a directed graph. A good \"hub\" links to good \"authorities\"" +
+				" and good \"authorities\" are linked from good \"hubs\".", 80))
+			.appendNewLine()
+			.appendln("usage: HITS --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+			.appendNewLine()
+			.appendln("options:")
+			.appendln("  --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
+			.appendln("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
+			.appendNewLine()
+			.appendln("  --output print")
+			.appendln("  --output hash")
+			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+			.toString();
+	}
+
+	public static void main(String[] args) throws Exception {
+		// Set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+
+		ParameterTool parameters = ParameterTool.fromArgs(args);
+		int iterations = parameters.getInt("iterations", DEFAULT_ITERATIONS);
+
+		DataSet hits;
+
+		switch (parameters.get("input", "")) {
+			case "csv": {
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				GraphCsvReader reader = Graph
+					.fromCsvReader(parameters.get("input_filename"), env)
+						.ignoreCommentsEdges("#")
+						.lineDelimiterEdges(lineDelimiter)
+						.fieldDelimiterEdges(fieldDelimiter);
+
+				switch (parameters.get("type", "")) {
+					case "integer": {
+						hits = reader
+							.keyType(LongValue.class)
+							.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
+					} break;
+
+					case "string": {
+						hits = reader
+							.keyType(StringValue.class)
+							.run(new org.apache.flink.graph.library.link_analysis.HITS<StringValue, NullValue, NullValue>(iterations));
+					} break;
+
+					default:
+						throw new ProgramParametrizationException(getUsage("invalid CSV type"));
+				}
+				} break;
+
+			case "rmat": {
+				int scale = parameters.getInt("scale", DEFAULT_SCALE);
+				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
+
+				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+				long vertexCount = 1L << scale;
+				long edgeCount = vertexCount * edgeFactor;
+
+				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+					.generate();
+
+				if (scale > 32) {
+					hits = graph
+						.run(new Simplify<LongValue, NullValue, NullValue>())
+						.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
+				} else {
+					hits = graph
+						.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
+						.run(new Simplify<IntValue, NullValue, NullValue>())
+						.run(new org.apache.flink.graph.library.link_analysis.HITS<IntValue, NullValue, NullValue>(iterations));
+				}
+				} break;
+
+			default:
+				throw new ProgramParametrizationException(getUsage("invalid input type"));
+		}
+
+		switch (parameters.get("output", "")) {
+			case "print":
+				for (Object e: hits.collect()) {
+					System.out.println(((Result)e).toVerboseString());
+				}
+				break;
+
+			case "hash":
+				System.out.println(DataSetUtils.checksumHashCode(hits));
+				break;
+
+			case "csv":
+				String filename = parameters.get("output_filename");
+
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				hits.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
+
+				env.execute();
+				break;
+			default:
+				throw new ProgramParametrizationException(getUsage("invalid output type"));
+		}
+
+		JobExecutionResult result = env.getLastJobExecutionResult();
+
+		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
new file mode 100644
index 0000000..5c173e0
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphCsvReader;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+
+import java.text.NumberFormat;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Driver for the library implementation of Jaccard Index.
+ *
+ * This example reads a simple, undirected graph from a CSV file or generates
+ * an undirected RMat graph with the given scale and edge factor then calculates
+ * all non-zero Jaccard Index similarity scores between vertices.
+ *
+ * @see org.apache.flink.graph.library.similarity.JaccardIndex
+ */
+public class JaccardIndex {
+
+	private static final int DEFAULT_SCALE = 10;
+
+	private static final int DEFAULT_EDGE_FACTOR = 16;
+
+	private static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+	private static String getUsage(String message) {
+		return new StrBuilder()
+			.appendNewLine()
+			.appendln(WordUtils.wrap("The Jaccard Index measures the similarity between vertex" +
+				" neighborhoods and is computed as the number of shared neighbors divided by the number of" +
+				" distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are" +
+				" shared).", 80))
+			.appendNewLine()
+			.appendln(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" +
+				" number of shared neighbors, and the number of distinct neighbors.", 80))
+			.appendNewLine()
+			.appendln("usage: JaccardIndex --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+			.appendNewLine()
+			.appendln("options:")
+			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
+			.appendln("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
+			.appendNewLine()
+			.appendln("  --output print")
+			.appendln("  --output hash")
+			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+			.appendNewLine()
+			.appendln("Usage error: " + message)
+			.toString();
+	}
+
+	public static void main(String[] args) throws Exception {
+		// Set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+
+		ParameterTool parameters = ParameterTool.fromArgs(args);
+
+		int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
+
+		DataSet ji;
+
+		switch (parameters.get("input", "")) {
+			case "csv": {
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				GraphCsvReader reader = Graph
+					.fromCsvReader(parameters.get("input_filename"), env)
+						.ignoreCommentsEdges("#")
+						.lineDelimiterEdges(lineDelimiter)
+						.fieldDelimiterEdges(fieldDelimiter);
+
+				switch (parameters.get("type", "")) {
+					case "integer": {
+						Graph<LongValue, NullValue, NullValue> graph = reader
+							.keyType(LongValue.class);
+
+						if (parameters.getBoolean("simplify", false)) {
+							graph = graph
+								.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
+						}
+
+						ji = graph
+							.run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+					} break;
+
+					case "string": {
+						Graph<StringValue, NullValue, NullValue> graph = reader
+							.keyType(StringValue.class);
+
+						if (parameters.getBoolean("simplify", false)) {
+							graph = graph
+								.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
+						}
+
+						ji = graph
+							.run(new org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+					} break;
+
+					default:
+						throw new ProgramParametrizationException(getUsage("invalid CSV type"));
+				}
+				} break;
+
+			case "rmat": {
+				int scale = parameters.getInt("scale", DEFAULT_SCALE);
+				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
+
+				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+				long vertexCount = 1L << scale;
+				long edgeCount = vertexCount * edgeFactor;
+
+				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+					.setParallelism(little_parallelism)
+					.generate();
+
+				boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+
+				if (scale > 32) {
+					ji = graph
+						.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
+							.setParallelism(little_parallelism))
+						.run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()
+							.setLittleParallelism(little_parallelism));
+				} else {
+					ji = graph
+						.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
+							.setParallelism(little_parallelism))
+						.run(new Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
+							.setParallelism(little_parallelism))
+						.run(new org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, NullValue>()
+							.setLittleParallelism(little_parallelism));
+				}
+				} break;
+
+			default:
+				throw new ProgramParametrizationException(getUsage("invalid input type"));
+		}
+
+		switch (parameters.get("output", "")) {
+			case "print":
+				for (Object e: ji.collect()) {
+					Result result = (Result)e;
+					System.out.println(result.toVerboseString());
+				}
+				break;
+
+			case "hash":
+				System.out.println(DataSetUtils.checksumHashCode(ji));
+				break;
+
+			case "csv":
+				String filename = parameters.get("output_filename");
+
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				ji.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
+
+				env.execute("Jaccard Index");
+				break;
+
+			default:
+				throw new ProgramParametrizationException(getUsage("invalid output type"));
+		}
+
+		JobExecutionResult result = env.getLastJobExecutionResult();
+
+		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
new file mode 100644
index 0000000..954f732
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphCsvReader;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Driver for the library implementation of Triangle Listing.
+ *
+ * This example reads a simple directed or undirected graph from a CSV file or
+ * generates an RMat graph with the given scale and edge factor then lists
+ * all triangles.
+ *
+ * @see org.apache.flink.graph.library.clustering.directed.TriangleListing
+ * @see org.apache.flink.graph.library.clustering.undirected.TriangleListing
+ */
+public class TriangleListing {
+
+	private static final int DEFAULT_SCALE = 10;
+
+	private static final int DEFAULT_EDGE_FACTOR = 16;
+
+	private static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+	private static String getUsage(String message) {
+		return new StrBuilder()
+			.appendNewLine()
+			.appendln(WordUtils.wrap("Lists all triangles in a graph.", 80))
+			.appendNewLine()
+			.appendln(WordUtils.wrap("This algorithm returns tuples containing the vertex IDs for each triangle and" +
+				" for directed graphs a bitmask indicating the presence of the six potential connecting edges.", 80))
+			.appendNewLine()
+			.appendln("usage: TriangleListing --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+			.appendNewLine()
+			.appendln("options:")
+			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
+			.appendln("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
+			.appendNewLine()
+			.appendln("  --output print")
+			.appendln("  --output hash")
+			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+			.toString();
+	}
+
+	public static void main(String[] args) throws Exception {
+		// Set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+
+		ParameterTool parameters = ParameterTool.fromArgs(args);
+		if (! parameters.has("directed")) {
+			throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
+		}
+		boolean directedAlgorithm = parameters.getBoolean("directed");
+
+		DataSet tl;
+
+		switch (parameters.get("input", "")) {
+			case "csv": {
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				GraphCsvReader reader = Graph
+					.fromCsvReader(parameters.get("input_filename"), env)
+						.ignoreCommentsEdges("#")
+						.lineDelimiterEdges(lineDelimiter)
+						.fieldDelimiterEdges(fieldDelimiter);
+
+				switch (parameters.get("type", "")) {
+					case "integer": {
+						Graph<LongValue, NullValue, NullValue> graph = reader
+							.keyType(LongValue.class);
+
+						if (directedAlgorithm) {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+							}
+
+							tl = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>());
+						} else {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
+							}
+
+							tl = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>());
+						}
+					} break;
+
+					case "string": {
+						Graph<StringValue, NullValue, NullValue> graph = reader
+							.keyType(StringValue.class);
+
+						if (directedAlgorithm) {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
+							}
+
+							tl = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, NullValue, NullValue>());
+						} else {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false));
+							}
+
+							tl = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue, NullValue, NullValue>());
+						}
+					} break;
+
+					default:
+						throw new ProgramParametrizationException(getUsage("invalid CSV type"));
+				}
+
+
+			} break;
+
+			case "rmat": {
+				int scale = parameters.getInt("scale", DEFAULT_SCALE);
+				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
+
+				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+				long vertexCount = 1L << scale;
+				long edgeCount = vertexCount * edgeFactor;
+
+				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+					.generate();
+
+				if (directedAlgorithm) {
+					if (scale > 32) {
+						tl = graph
+							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>())
+							.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>());
+					} else {
+						tl = graph
+							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
+							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>())
+							.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, NullValue, NullValue>());
+					}
+				} else {
+					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+
+					graph = graph
+						.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+
+					if (scale > 32) {
+						tl = graph
+							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip))
+							.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>());
+					} else {
+						tl = graph
+							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
+							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip))
+							.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>());
+					}
+				}
+			} break;
+
+			default:
+				throw new ProgramParametrizationException(getUsage("invalid input type"));
+		}
+
+		switch (parameters.get("output", "")) {
+			case "print":
+				if (directedAlgorithm) {
+					for (Object e: tl.collect()) {
+						org.apache.flink.graph.library.clustering.directed.TriangleListing.Result result =
+							(org.apache.flink.graph.library.clustering.directed.TriangleListing.Result) e;
+						System.out.println(result.toVerboseString());
+					}
+				} else {
+					tl.print();
+				}
+				break;
+
+			case "hash":
+				System.out.println(DataSetUtils.checksumHashCode(tl));
+				break;
+
+			case "csv":
+				String filename = parameters.get("output_filename");
+
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				tl.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
+
+				env.execute();
+				break;
+			default:
+				throw new ProgramParametrizationException(getUsage("invalid output type"));
+		}
+
+		JobExecutionResult result = env.getLastJobExecutionResult();
+
+		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+	}
+}