You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/03/31 16:35:28 UTC

[5/6] flink git commit: [FLINK-5913] [gelly] Example drivers

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
index 93a96c4..ca0c167 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
@@ -18,315 +18,125 @@
 
 package org.apache.flink.graph.drivers;
 
-import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAnalytic;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.BooleanParameter;
+import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.types.CopyableValue;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
- * Driver for the library implementation of Triangle Listing.
- *
- * This example reads a simple directed or undirected graph from a CSV file or
- * generates an RMat graph with the given scale and edge factor then lists
- * all triangles.
+ * Driver for directed and undirected triangle listing algorithm and analytic.
  *
  * @see org.apache.flink.graph.library.clustering.directed.TriangleListing
+ * @see org.apache.flink.graph.library.clustering.directed.TriadicCensus
  * @see org.apache.flink.graph.library.clustering.undirected.TriangleListing
+ * @see org.apache.flink.graph.library.clustering.undirected.TriadicCensus
  */
-public class TriangleListing {
-
-	private static final int DEFAULT_SCALE = 10;
-
-	private static final int DEFAULT_EDGE_FACTOR = 16;
+public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends SimpleDriver<PrintableResult>
+implements Driver<K, VV, EV>, CSV, Hash, Print {
 
-	private static final boolean DEFAULT_TRIADIC_CENSUS = true;
-
-	private static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-	private static String getUsage(String message) {
-		return new StrBuilder()
-			.appendNewLine()
-			.appendln(WordUtils.wrap("Lists all triangles in a graph.", 80))
-			.appendNewLine()
-			.appendln(WordUtils.wrap("This algorithm returns tuples containing the vertex IDs for each triangle and" +
-				" for directed graphs a bitmask indicating the presence of the six potential connecting edges.", 80))
-			.appendNewLine()
-			.appendln("usage: TriangleListing --directed <true | false> [--triadic_census <true | false>] --input <csv | rmat> --output <print | hash | csv>")
-			.appendNewLine()
-			.appendln("options:")
-			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
-			.appendln("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
-			.appendNewLine()
-			.appendln("  --output print")
-			.appendln("  --output hash")
-			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
-			.appendNewLine()
-			.appendln("Usage error: " + message)
-			.toString();
-	}
+	private static final String DIRECTED = "directed";
 
-	public static void main(String[] args) throws Exception {
-		// Set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
+	private static final String UNDIRECTED = "undirected";
 
-		ParameterTool parameters = ParameterTool.fromArgs(args);
-		env.getConfig().setGlobalJobParameters(parameters);
+	private ChoiceParameter order = new ChoiceParameter(this, "order")
+		.addChoices(DIRECTED, UNDIRECTED);
 
-		if (! parameters.has("directed")) {
-			throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
-		}
-		boolean directedAlgorithm = parameters.getBoolean("directed");
+	private BooleanParameter sortTriangleVertices = new BooleanParameter(this, "sort_triangle_vertices");
 
-		int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
-		boolean triadic_census = parameters.getBoolean("triadic_census", DEFAULT_TRIADIC_CENSUS);
+	private BooleanParameter computeTriadicCensus = new BooleanParameter(this, "triadic_census");
 
-		GraphAnalytic tc = null;
-		DataSet tl;
-
-		switch (parameters.get("input", "")) {
-			case "csv": {
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.getRequired("input_filename"), env)
-						.ignoreCommentsEdges("#")
-						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter);
-
-				switch (parameters.get("type", "")) {
-					case "integer": {
-						Graph<LongValue, NullValue, NullValue> graph = reader
-							.keyType(LongValue.class);
-
-						if (directedAlgorithm) {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
-										.setParallelism(little_parallelism));
-							}
-
-							if (triadic_census) {
-								tc = graph
-									.run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<LongValue, NullValue, NullValue>()
-										.setLittleParallelism(little_parallelism));
-							}
-							tl = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						} else {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false)
-										.setParallelism(little_parallelism));
-							}
-
-							if (triadic_census) {
-								tc = graph
-									.run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<LongValue, NullValue, NullValue>()
-										.setLittleParallelism(little_parallelism));
-							}
-							tl = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						}
-					} break;
-
-					case "string": {
-						Graph<StringValue, NullValue, NullValue> graph = reader
-							.keyType(StringValue.class);
-
-						if (directedAlgorithm) {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>()
-										.setParallelism(little_parallelism));
-							}
-
-							if (triadic_census) {
-								tc = graph
-									.run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<StringValue, NullValue, NullValue>()
-										.setLittleParallelism(little_parallelism));
-							}
-							tl = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						} else {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false)
-										.setParallelism(little_parallelism));
-							}
-
-							if (triadic_census) {
-								tc = graph
-									.run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<StringValue, NullValue, NullValue>()
-										.setLittleParallelism(little_parallelism));
-							}
-							tl = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						}
-					} break;
-
-					default:
-						throw new ProgramParametrizationException(getUsage("invalid CSV type"));
-				}
+	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
 
+	private GraphAnalytic<K, VV, EV, ? extends PrintableResult> triadicCensus;
 
-			} break;
-
-			case "rmat": {
-				int scale = parameters.getInt("scale", DEFAULT_SCALE);
-				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
-				long vertexCount = 1L << scale;
-				long edgeCount = vertexCount * edgeFactor;
-
-				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-					.generate();
-
-				if (directedAlgorithm) {
-					if (scale > 32) {
-						Graph<LongValue, NullValue, NullValue> simpleGraph = graph
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
-								.setParallelism(little_parallelism));
-
-						if (triadic_census) {
-							tc = simpleGraph
-								.run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						}
-						tl = simpleGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-					} else {
-						Graph<LongValue, NullValue, NullValue> simpleGraph = graph
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
-								.setParallelism(little_parallelism));
-
-						if (triadic_census) {
-							tc = simpleGraph
-								.run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						}
-						tl = simpleGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-					}
-				} else {
-					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+	@Override
+	public String getName() {
+		return this.getClass().getSimpleName();
+	}
 
-					if (scale > 32) {
-						Graph<LongValue, NullValue, NullValue> simpleGraph = graph
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
-								.setParallelism(little_parallelism));
+	@Override
+	public String getShortDescription() {
+		return "list triangles";
+	}
 
-						if (triadic_census) {
-							tc = simpleGraph
-								.run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						}
-						tl = simpleGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-					} else {
-						Graph<IntValue, NullValue, NullValue> simpleGraph = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
-								.setParallelism(little_parallelism))
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
-								.setParallelism(little_parallelism));
+	@Override
+	public String getLongDescription() {
+		return WordUtils.wrap(new StrBuilder()
+			.appendln("List all triangles graph.")
+			.appendNewLine()
+			.append("The algorithm result contains three vertex IDs. For the directed algorithm " +
+				"the result contains an additional bitmask indicating the presence of the six " +
+				"potential connecting edges.")
+			.toString(), 80);
+	}
 
-						if (triadic_census) {
-							tc = simpleGraph
-								.run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<IntValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						}
-						tl = simpleGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-					}
+	@Override
+	public void plan(Graph<K, VV, EV> graph) throws Exception {
+		int lp = littleParallelism.getValue().intValue();
+
+		switch (order.getValue()) {
+			case DIRECTED:
+				result = graph
+					.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<K, VV, EV>()
+						.setSortTriangleVertices(sortTriangleVertices.getValue())
+						.setLittleParallelism(lp));
+
+				if (computeTriadicCensus.getValue()) {
+					triadicCensus = graph
+						.run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<K, VV, EV>()
+							.setLittleParallelism(lp));
 				}
-			} break;
+				break;
 
-			default:
-				throw new ProgramParametrizationException(getUsage("invalid input type"));
-		}
+			case UNDIRECTED:
+				result = graph
+					.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<K, VV, EV>()
+						.setSortTriangleVertices(sortTriangleVertices.getValue())
+						.setLittleParallelism(lp));
 
-		switch (parameters.get("output", "")) {
-			case "print":
-				System.out.println();
-				if (directedAlgorithm) {
-					for (Object e: tl.collect()) {
-						org.apache.flink.graph.library.clustering.directed.TriangleListing.Result result =
-							(org.apache.flink.graph.library.clustering.directed.TriangleListing.Result) e;
-						System.out.println(result.toPrintableString());
-					}
-				} else {
-					tl.print();
+				if (computeTriadicCensus.getValue()) {
+					triadicCensus = graph
+						.run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<K, VV, EV>()
+							.setLittleParallelism(lp));
 				}
 				break;
+		}
+	}
 
-			case "hash":
-				System.out.println();
-				System.out.println(DataSetUtils.checksumHashCode(tl));
-				break;
-
-			case "csv":
-				String filename = parameters.getRequired("output_filename");
-
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+	@Override
+	public void hash(String executionName) throws Exception {
+		super.hash(executionName);
+		printAnalytics();
+	}
 
-				tl.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
+	@Override
+	public void print(String executionName) throws Exception {
+		super.print(executionName);
+		printAnalytics();
+	}
 
-				env.execute();
-				break;
-			default:
-				throw new ProgramParametrizationException(getUsage("invalid output type"));
-		}
+	@Override
+	public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) {
+		super.writeCSV(filename, lineDelimiter, fieldDelimiter);
+		printAnalytics();
+	}
 
-		if (tc != null) {
+	private void printAnalytics() {
+		if (computeTriadicCensus.getValue()) {
 			System.out.print("Triadic census:\n  ");
-			System.out.println(tc.getResult().toString().replace(";", "\n "));
+			System.out.println(triadicCensus.getResult().toPrintableString().replace(";", "\n "));
 		}
-
-		JobExecutionResult result = env.getLastJobExecutionResult();
-
-		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println();
-		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java
new file mode 100644
index 0000000..e9d648a
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.parameter;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.drivers.parameter.IterationConvergence.Value;
+
+/**
+ * Iterative algorithms which converge can be terminated with a maximum number
+ * of iterations or a convergence threshold which stops computation when the
+ * total change in scores is below a given delta.
+ *
+ * If the command-line configuration specifies neither a number of iterations
+ * nor a convergence threshold then a default number of iterations is used
+ * with an infinite convergence threshold. Otherwise, when either value is
+ * configured then an unset value is set to infinity.
+ */
+public class IterationConvergence
+implements Parameter<Value> {
+
+	private final int defaultIterations;
+
+	private final Value value = new Value();
+
+	/**
+	 * Add this parameter to the list of parameters stored by owner.
+	 *
+	 * @param owner the {@link Parameterized} using this {@link Parameter}
+	 * @param defaultIterations the default number of iterations if neither
+	 *                          the number of iterations nor the convergence
+	 *                          threshold are specified
+	 */
+	public IterationConvergence(ParameterizedBase owner, int defaultIterations) {
+		owner.addParameter(this);
+		this.defaultIterations = defaultIterations;
+	}
+
+	@Override
+	public String getUsage() {
+		return "[--iterations ITERATIONS] [--convergence_threshold CONVERGENCE_THRESHOLD]";
+	}
+
+	@Override
+	public void configure(ParameterTool parameterTool) {
+		if (!parameterTool.has("iterations") && !parameterTool.has("convergence_threshold")) {
+			// no configuration so use default iterations and maximum threshold
+			value.iterations = defaultIterations;
+			value.convergenceThreshold = Double.MAX_VALUE;
+		} else {
+			// use configured values and maximum default for unset values
+			value.iterations = parameterTool.getInt("iterations", Integer.MAX_VALUE);
+			Util.checkParameter(value.iterations > 0,
+				"iterations must be greater than zero");
+
+			value.convergenceThreshold = parameterTool.getDouble("convergence_threshold", Double.MAX_VALUE);
+			Util.checkParameter(value.convergenceThreshold > 0,
+				"convergence threshold must be greater than zero");
+		}
+	}
+
+	@Override
+	public Value getValue() {
+		return value;
+	}
+
+	/**
+	 * Encapsulate the number of iterations and the convergence threshold.
+	 */
+	public static class Value {
+		public int iterations;
+		public double convergenceThreshold;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
deleted file mode 100644
index 6651739..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData;
-import org.apache.flink.graph.library.GSAConnectedComponents;
-import org.apache.flink.types.NullValue;
-
-/**
- * This example shows how to use Gelly's library methods.
- * You can find all available library methods in {@link org.apache.flink.graph.library}. 
- * 
- * In particular, this example uses the {@link GSAConnectedComponents}
- * library method to compute the connected components of the input graph.
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\n1\t3\n</code> defines two edges,
- * 1-2 with and 1-3.
- *
- * Usage <code>ConnectedComponents &lt;edge path&gt; &lt;result path&gt;
- * &lt;number of iterations&gt; </code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link ConnectedComponentsDefaultData}
- */
-public class ConnectedComponents implements ProgramDescription {
-
-	@SuppressWarnings("serial")
-	public static void main(String [] args) throws Exception {
-
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
-
-		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new MapFunction<Long, Long>() {
-			@Override
-			public Long map(Long value) throws Exception {
-				return value;
-			}
-		}, env);
-
-		DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
-				.run(new GSAConnectedComponents<Long, Long, NullValue>(maxIterations));
-
-		// emit result
-		if (fileOutput) {
-			verticesWithMinIds.writeAsCsv(outputPath, "\n", ",");
-
-			// since file sinks are lazy, we trigger the execution explicitly
-			env.execute("Connected Components Example");
-		} else {
-			verticesWithMinIds.print();
-		}
-	}
-
-	@Override
-	public String getDescription() {
-		return "Connected Components Example";
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String edgeInputPath = null;
-	private static String outputPath = null;
-	private static Integer maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS;
-
-	private static boolean parseParameters(String [] args) {
-		if(args.length > 0) {
-			if(args.length != 3) {
-				System.err.println("Usage ConnectedComponents <edge path> <output path> " +
-						"<num iterations>");
-				return false;
-			}
-
-			fileOutput = true;
-			edgeInputPath = args[0];
-			outputPath = args[1];
-			maxIterations = Integer.parseInt(args[2]);
-
-		} else {
-			System.out.println("Executing ConnectedComponents example with default parameters and built-in default data.");
-			System.out.println("Provide parameters to read input data from files.");
-			System.out.println("Usage ConnectedComponents <edge path> <output path> " +
-					"<num iterations>");
-		}
-
-		return true;
-	}
-
-	@SuppressWarnings("serial")
-	private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
-
-		if(fileOutput) {
-			return env.readCsvFile(edgeInputPath)
-					.ignoreComments("#")
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class)
-					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-						@Override
-						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
-							return new Edge<>(value.f0, value.f1, NullValue.getInstance());
-						}
-					});
-		} else {
-			return ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
index 35f07b0..1cd3549 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
@@ -114,7 +114,7 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
 		public Double gather(Neighbor<Double, Double> neighbor) {
 			return neighbor.getNeighborValue() + neighbor.getEdgeValue();
 		}
-	};
+	}
 
 	@SuppressWarnings("serial")
 	private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
@@ -122,7 +122,7 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
 		public Double sum(Double newValue, Double currentValue) {
 			return Math.min(newValue, currentValue);
 		}
-	};
+	}
 
 	@SuppressWarnings("serial")
 	private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/IterationConvergenceTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/IterationConvergenceTest.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/IterationConvergenceTest.java
new file mode 100644
index 0000000..ae92943
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/IterationConvergenceTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.parameter;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IterationConvergenceTest
+extends ParameterTestBase {
+
+	private IterationConvergence parameter;
+
+	@Before
+	public void setup() {
+		super.setup();
+
+		parameter = new IterationConvergence(owner, 10);
+	}
+
+	@Test
+	public void testWithIterations() {
+		parameter.configure(ParameterTool.fromArgs(new String[]{"--iterations", "42"}));
+		Assert.assertEquals(42, parameter.getValue().iterations);
+		Assert.assertEquals(Double.MAX_VALUE, parameter.getValue().convergenceThreshold, 0.000001);
+	}
+
+	@Test
+	public void testWithConvergenceThreshold() {
+		parameter.configure(ParameterTool.fromArgs(new String[]{"--convergence_threshold", "42"}));
+		Assert.assertEquals(Integer.MAX_VALUE, parameter.getValue().iterations);
+		Assert.assertEquals(42.0, parameter.getValue().convergenceThreshold, 0.000001);
+	}
+
+	@Test
+	public void testWithBoth() {
+		parameter.configure(ParameterTool.fromArgs(new String[]{"--iterations", "42", "--convergence_threshold", "42"}));
+		Assert.assertEquals(42, parameter.getValue().iterations);
+		Assert.assertEquals(42.0, parameter.getValue().convergenceThreshold, 0.000001);
+	}
+
+	@Test
+	public void testWithNeither() {
+		parameter.configure(ParameterTool.fromArgs(new String[]{}));
+		Assert.assertEquals(10, parameter.getValue().iterations);
+		Assert.assertEquals(Double.MAX_VALUE, parameter.getValue().convergenceThreshold, 0.000001);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java
deleted file mode 100644
index d0de8dc..0000000
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.examples;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.examples.ConnectedComponents;
-import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class ConnectedComponentsITCase extends MultipleProgramsTestBase {
-
-	private String edgesPath;
-
-	private String resultPath;
-
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	public ConnectedComponentsITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Before
-	public void before() throws Exception {
-		resultPath = tempFolder.newFile().toURI().toString();
-
-		File edgesFile = tempFolder.newFile();
-		Files.write(ConnectedComponentsDefaultData.EDGES, edgesFile, Charsets.UTF_8);
-		edgesPath = edgesFile.toURI().toString();
-	}
-
-	@Test
-	public void testConnectedComponentsExample() throws Exception {
-		ConnectedComponents.main(new String[]{edgesPath, resultPath, ConnectedComponentsDefaultData.MAX_ITERATIONS + ""});
-		expected = ConnectedComponentsDefaultData.VERTICES_WITH_MIN_ID;
-	}
-
-	@After
-	public void after() throws Exception {
-		TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index cbbfb02..71baaa9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -529,7 +529,13 @@ public class Graph<K, VV, EV> {
 
 		TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0);
 
-		TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, vertices.getType(), null);
+		TypeInformation<NV> valueType;
+
+		if (mapper instanceof ResultTypeQueryable) {
+			valueType = ((ResultTypeQueryable) mapper).getProducedType();
+		} else {
+			valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, vertices.getType(), null);
+		}
 
 		TypeInformation<Vertex<K, NV>> returnType = (TypeInformation<Vertex<K, NV>>) new TupleTypeInfo(
 				Vertex.class, keyType, valueType);
@@ -573,7 +579,13 @@ public class Graph<K, VV, EV> {
 
 		TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
 
-		TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, edges.getType(), null);
+		TypeInformation<NV> valueType;
+
+		if (mapper instanceof ResultTypeQueryable) {
+			valueType = ((ResultTypeQueryable) mapper).getProducedType();
+		} else {
+			valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, edges.getType(), null);
+		}
 
 		TypeInformation<Edge<K, NV>> returnType = (TypeInformation<Edge<K, NV>>) new TupleTypeInfo(
 				Edge.class, keyType, keyType, valueType);

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
index 3cd8f05..959b816 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
@@ -22,13 +22,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.ScatterFunction;
-import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.graph.utils.GraphUtils.MapTo;
 import org.apache.flink.types.NullValue;
 
 /**
@@ -72,7 +73,7 @@ public class ConnectedComponents<K, VV extends Comparable<VV>, EV>
 		TypeInformation<VV> valueTypeInfo = ((TupleTypeInfo<?>) graph.getVertices().getType()).getTypeAt(1);
 
 		Graph<K, VV, NullValue> undirectedGraph = graph
-			.mapEdges(new NullValueEdgeMapper<K, EV>())
+			.mapEdges(new MapTo<Edge<K, EV>, NullValue>(NullValue.getInstance()))
 			.getUndirected();
 
 		return undirectedGraph.runScatterGatherIteration(

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
index 327de73..1680f38 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
@@ -29,7 +30,7 @@ import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.Neighbor;
 import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.graph.utils.GraphUtils.MapTo;
 import org.apache.flink.types.NullValue;
 
 /**
@@ -73,7 +74,7 @@ public class GSAConnectedComponents<K, VV extends Comparable<VV>, EV>
 		TypeInformation<VV> valueTypeInfo = ((TupleTypeInfo<?>) graph.getVertices().getType()).getTypeAt(1);
 
 		Graph<K, VV, NullValue> undirectedGraph = graph
-			.mapEdges(new NullValueEdgeMapper<K, EV>())
+			.mapEdges(new MapTo<Edge<K, EV>, NullValue>(NullValue.getInstance()))
 			.getUndirected();
 
 		return undirectedGraph.runGatherSumApplyIteration(
@@ -87,7 +88,6 @@ public class GSAConnectedComponents<K, VV extends Comparable<VV>, EV>
 	//  Connected Components UDFs
 	// --------------------------------------------------------------------------------------------
 
-	@SuppressWarnings("serial")
 	private static final class GatherNeighborIds<VV extends Comparable<VV>>
 		extends GatherFunction<VV, NullValue, VV>
 		implements ResultTypeQueryable<VV> {
@@ -108,7 +108,6 @@ public class GSAConnectedComponents<K, VV extends Comparable<VV>, EV>
 		}
 	}
 
-	@SuppressWarnings("serial")
 	private static final class SelectMinId<VV extends Comparable<VV>>
 		extends SumFunction<VV, NullValue, VV>
 		implements ResultTypeQueryable<VV> {
@@ -129,7 +128,6 @@ public class GSAConnectedComponents<K, VV extends Comparable<VV>, EV>
 		}
 	}
 
-	@SuppressWarnings("serial")
 	private static final class UpdateComponentId<K, VV extends Comparable<VV>>
 		extends ApplyFunction<K, VV, VV>
 		implements ResultTypeQueryable<VV> {

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index 96e5afc..0064a68 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -22,13 +22,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.ScatterFunction;
-import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.graph.utils.GraphUtils.MapTo;
 import org.apache.flink.types.NullValue;
 
 import java.util.HashMap;
@@ -76,7 +77,7 @@ public class LabelPropagation<K, VV extends Comparable<VV>, EV>
 		TypeInformation<VV> valueType = ((TupleTypeInfo<?>) input.getVertices().getType()).getTypeAt(1);
 		// iteratively adopt the most frequent label among the neighbors of each vertex
 		return input
-			.mapEdges(new NullValueEdgeMapper<K, EV>())
+			.mapEdges(new MapTo<Edge<K, EV>, NullValue>(NullValue.getInstance()))
 			.runScatterGatherIteration(
 				new SendNewLabelToNeighbors<K, VV>(valueType), new UpdateVertexLabel<K, VV>(), maxIterations)
 			.getVertices();

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index 236272f..6fe753a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -35,9 +35,9 @@ import org.apache.flink.graph.EdgeOrder;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
-import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result;
 import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.asm.result.TertiaryResult;
+import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.ByteValue;

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 9aca8a4..eda5c1c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -32,8 +32,8 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
 import org.apache.flink.graph.asm.result.PrintableResult;
-import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
 import org.apache.flink.graph.asm.result.UnaryResult;
+import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
index 57743e8..1dfa3ee 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
@@ -40,6 +40,8 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.directed.EdgeSourceDegrees;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.asm.result.UnaryResult;
 import org.apache.flink.graph.library.link_analysis.Functions.SumScore;
 import org.apache.flink.graph.library.link_analysis.PageRank.Result;
 import org.apache.flink.graph.utils.GraphUtils;
@@ -500,7 +502,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 * @param <T> ID type
 	 */
 	public static class Result<T>
-	extends Tuple2<T, DoubleValue> {
+	extends Tuple2<T, DoubleValue>
+	implements PrintableResult, UnaryResult<T> {
 		public static final int HASH_SEED = 0x4010af29;
 
 		private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
@@ -518,7 +521,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			return f1;
 		}
 
-		public String toVerboseString() {
+		@Override
+		public String toPrintableString() {
 			return "Vertex ID: " + getVertexId0()
 				+ ", PageRank score: " + getPageRankScore();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index 7d77541..6aaf9f2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -34,8 +34,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
-import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.asm.result.BinaryResult;
+import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index 3b36715..0c80e6d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -29,8 +29,8 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
-import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.asm.result.BinaryResult;
+import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
index 2e0dffc..78fb378 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
@@ -20,7 +20,10 @@ package org.apache.flink.graph.utils;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.types.LongValue;
 
 import static org.apache.flink.api.java.typeutils.ValueTypeInfo.LONG_VALUE_TYPE_INFO;
@@ -62,7 +65,7 @@ public class GraphUtils {
 	 * @param <O> output type
 	 */
 	public static class MapTo<I, O>
-	implements MapFunction<I, O> {
+	implements MapFunction<I, O>, ResultTypeQueryable<O> {
 		private final O value;
 
 		/**
@@ -78,6 +81,11 @@ public class GraphUtils {
 		public O map(I o) throws Exception {
 			return value;
 		}
+
+		@Override
+		public TypeInformation<O> getProducedType() {
+			return (TypeInformation<O>)TypeExtractor.createTypeInfo(value.getClass());
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
deleted file mode 100644
index 2bd4719..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.types.NullValue;
-
-public class NullValueEdgeMapper<K, EV> implements	MapFunction<Edge<K, EV>, NullValue> {
-
-	private static final long serialVersionUID = 1L;
-
-	public NullValue map(Edge<K, EV> edge) {
-		return NullValue.getInstance();
-	}
-}