You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/03/09 15:11:26 UTC

[2/2] flink git commit: [FLINK-5909] [gelly] Interface for GraphAlgorithm results

[FLINK-5909] [gelly] Interface for GraphAlgorithm results

Create PrintableResult interface for library algorithms and analytics
containing a toPrintableString method used by drivers to print
human-readable results to stdout.

Also create interfaces for UnaryResult, BinaryResult, and TertiaryResult
implementing methods to access the 0th, 1st, and 2nd vertices.

This closes #3434


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/33cd9795
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/33cd9795
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/33cd9795

Branch: refs/heads/pr3434
Commit: 33cd97953a7943437acb925ad3296a1eb9858c73
Parents: 2592a19
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Feb 28 13:10:20 2017 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Mar 9 09:15:42 2017 -0500

----------------------------------------------------------------------
 .../graph/drivers/ClusteringCoefficient.java    |  4 +-
 .../org/apache/flink/graph/drivers/HITS.java    |  2 +-
 .../flink/graph/drivers/JaccardIndex.java       |  2 +-
 .../flink/graph/drivers/TriangleListing.java    |  2 +-
 .../main/java/org/apache/flink/graph/Graph.java |  8 +--
 .../graph/asm/dataset/ChecksumHashCode.java     |  8 +++
 .../apache/flink/graph/asm/dataset/Collect.java |  5 ++
 .../apache/flink/graph/asm/dataset/Count.java   |  5 ++
 .../flink/graph/asm/result/BinaryResult.java    | 41 +++++++++++++
 .../flink/graph/asm/result/PrintableResult.java | 34 +++++++++++
 .../flink/graph/asm/result/TertiaryResult.java  | 48 +++++++++++++++
 .../flink/graph/asm/result/UnaryResult.java     | 34 +++++++++++
 .../graph/asm/simple/directed/Simplify.java     |  3 +-
 .../graph/asm/simple/undirected/Simplify.java   |  3 +-
 .../flink/graph/generator/CompleteGraph.java    |  8 ++-
 .../flink/graph/generator/CycleGraph.java       |  8 ++-
 .../flink/graph/generator/EmptyGraph.java       |  8 ++-
 .../apache/flink/graph/generator/GridGraph.java |  5 +-
 .../flink/graph/generator/HypercubeGraph.java   |  8 ++-
 .../apache/flink/graph/generator/PathGraph.java |  8 ++-
 .../apache/flink/graph/generator/RMatGraph.java | 33 ++++++-----
 .../graph/generator/SingletonEdgeGraph.java     |  8 ++-
 .../apache/flink/graph/generator/StarGraph.java |  8 ++-
 .../directed/AverageClusteringCoefficient.java  |  8 ++-
 .../directed/GlobalClusteringCoefficient.java   |  8 ++-
 .../directed/LocalClusteringCoefficient.java    | 44 +++++++-------
 .../clustering/directed/TriadicCensus.java      |  6 +-
 .../clustering/directed/TriangleListing.java    | 58 +++++++++++++------
 .../AverageClusteringCoefficient.java           |  8 ++-
 .../undirected/GlobalClusteringCoefficient.java | 13 +++--
 .../undirected/LocalClusteringCoefficient.java  | 46 ++++++++-------
 .../clustering/undirected/TriadicCensus.java    |  7 ++-
 .../clustering/undirected/TriangleListing.java  | 61 +++++++++++++++++---
 .../flink/graph/library/link_analysis/HITS.java | 31 +++++-----
 .../graph/library/link_analysis/PageRank.java   |  2 +-
 .../library/metric/directed/EdgeMetrics.java    |  6 +-
 .../library/metric/directed/VertexMetrics.java  | 20 ++++---
 .../library/metric/undirected/EdgeMetrics.java  |  6 +-
 .../metric/undirected/VertexMetrics.java        | 20 ++++---
 .../graph/library/similarity/AdamicAdar.java    | 41 +++++++++----
 .../graph/library/similarity/JaccardIndex.java  | 53 ++++++++++++-----
 .../apache/flink/graph/utils/GraphUtils.java    |  4 +-
 .../LocalClusteringCoefficientTest.java         | 12 ++--
 .../LocalClusteringCoefficientTest.java         | 12 ++--
 .../undirected/TriangleListingTest.java         | 12 ++--
 .../library/similarity/JaccardIndexTest.java    | 44 +++++++-------
 .../graph/test/operations/DegreesITCase.java    |  2 -
 47 files changed, 574 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
index 79a17a4..004390d 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
@@ -306,13 +306,13 @@ public class ClusteringCoefficient {
 					for (Object e: lcc.collect()) {
 						org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result result =
 							(org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result)e;
-						System.out.println(result.toVerboseString());
+						System.out.println(result.toPrintableString());
 					}
 				} else {
 					for (Object e: lcc.collect()) {
 						org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result result =
 							(org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result)e;
-						System.out.println(result.toVerboseString());
+						System.out.println(result.toPrintableString());
 					}
 				}
 				break;

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
index 453b543..db27f0e 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
@@ -159,7 +159,7 @@ public class HITS {
 			case "print":
 				System.out.println();
 				for (Object e: hits.collect()) {
-					System.out.println(((Result)e).toVerboseString());
+					System.out.println(((Result)e).toPrintableString());
 				}
 				break;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
index abb675a..09479a6 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
@@ -192,7 +192,7 @@ public class JaccardIndex {
 				System.out.println();
 				for (Object e: ji.collect()) {
 					Result result = (Result)e;
-					System.out.println(result.toVerboseString());
+					System.out.println(result.toPrintableString());
 				}
 				break;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
index 1fecc3d..93a96c4 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
@@ -289,7 +289,7 @@ public class TriangleListing {
 					for (Object e: tl.collect()) {
 						org.apache.flink.graph.library.clustering.directed.TriangleListing.Result result =
 							(org.apache.flink.graph.library.clustering.directed.TriangleListing.Result) e;
-						System.out.println(result.toVerboseString());
+						System.out.println(result.toPrintableString());
 					}
 				} else {
 					tl.print();

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index dae7a11..cbbfb02 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -1126,7 +1126,7 @@ public class Graph<K, VV, EV> {
 		}
 	}
 
-	private static final class ProjectVertexWithEdgeValueMap<K, EV>	implements MapFunction<
+	private static final class ProjectVertexWithEdgeValueMap<K, EV> implements MapFunction<
 		Edge<K, EV>, Tuple2<K, EV>> {
 
 		private int fieldPosition;
@@ -1217,7 +1217,7 @@ public class Graph<K, VV, EV> {
 			this.function = fun;
 		}
 
-		public void coGroup(Iterable<Vertex<K, VV>> vertex, 	final Iterable<Tuple2<K, Edge<K, EV>>> keysWithEdges,
+		public void coGroup(Iterable<Vertex<K, VV>> vertex, final Iterable<Tuple2<K, Edge<K, EV>>> keysWithEdges,
 				Collector<T> out) throws Exception {
 
 			final Iterator<Edge<K, EV>> edgesIterator = new Iterator<Edge<K, EV>>() {
@@ -2149,12 +2149,12 @@ public class Graph<K, VV, EV> {
 
 		public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors,
 				Collector<T> out) throws Exception {
-			function.iterateNeighbors(vertex.iterator().next(), 	neighbors, out);
+			function.iterateNeighbors(vertex.iterator().next(), neighbors, out);
 		}
 
 		@Override
 		public TypeInformation<T> getProducedType() {
-			return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, 	function.getClass(), 3, null, null);
+			return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, function.getClass(), 3, null, null);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java
index 13db7a0..1f8fe99 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java
@@ -60,6 +60,11 @@ extends AbstractDataSetAnalytic<T, Checksum> {
 		return checksumHashCodeHelper.getAccumulator(env, CHECKSUM);
 	}
 
+	/**
+	 * Helper class to count elements and sum element hashcodes.
+	 *
+	 * @param <U> element type
+	 */
 	private static class ChecksumHashCodeHelper<U>
 	extends AnalyticHelper<U> {
 		private long count;
@@ -78,6 +83,9 @@ extends AbstractDataSetAnalytic<T, Checksum> {
 		}
 	}
 
+	/**
+	 * Wraps checksum and count.
+	 */
 	public static class Checksum
 	implements SimpleAccumulator<Checksum> {
 		private long count;

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
index 4398296..771a044 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
@@ -75,6 +75,11 @@ extends AbstractDataSetAnalytic<T, List<T>> {
 		}
 	}
 
+	/**
+	 * Helper class to collect elements into a serialized list.
+	 *
+	 * @param <U> element type
+	 */
 	private static class CollectHelper<U>
 	extends AnalyticHelper<U> {
 		private SerializedListAccumulator<U> accumulator;

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Count.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Count.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Count.java
index 7303d3a..7bc97d5 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Count.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Count.java
@@ -55,6 +55,11 @@ extends AbstractDataSetAnalytic<T, Long> {
 		return countHelper.getAccumulator(env, COUNT);
 	}
 
+	/**
+	 * Helper class to count elements.
+	 *
+	 * @param <U> element type
+	 */
 	private static class CountHelper<U>
 	extends AnalyticHelper<U> {
 		private long count;

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResult.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResult.java
new file mode 100644
index 0000000..06a3d24
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResult.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.result;
+
+import org.apache.flink.graph.GraphAlgorithm;
+
+/**
+ * A {@link GraphAlgorithm} result for a pair vertices.
+ */
+public interface BinaryResult<T> {
+
+	/**
+	 * Get the first vertex ID.
+	 *
+	 * @return first vertex ID
+	 */
+	T getVertexId0();
+
+	/**
+	 * Get the second vertex ID.
+	 *
+	 * @return second vertex ID
+	 */
+	T getVertexId1();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/PrintableResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/PrintableResult.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/PrintableResult.java
new file mode 100644
index 0000000..39c076d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/PrintableResult.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.result;
+
+import org.apache.flink.graph.GraphAlgorithm;
+
+/**
+ * Base interface for {@link GraphAlgorithm} results.
+ */
+public interface PrintableResult {
+
+	/**
+	 * A human-readable representation of this value.
+	 *
+	 * @return printable string
+	 */
+	String toPrintableString();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResult.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResult.java
new file mode 100644
index 0000000..c41c9af
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResult.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.result;
+
+import org.apache.flink.graph.GraphAlgorithm;
+
+/**
+ * A {@link GraphAlgorithm} result for three vertices.
+ */
+public interface TertiaryResult<T> {
+
+	/**
+	 * Get the first vertex ID.
+	 *
+	 * @return first vertex ID
+	 */
+	T getVertexId0();
+
+	/**
+	 * Get the second vertex ID.
+	 *
+	 * @return second vertex ID
+	 */
+	T getVertexId1();
+
+	/**
+	 * Get the third vertex ID.
+	 *
+	 * @return third vertex ID
+	 */
+	T getVertexId2();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResult.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResult.java
new file mode 100644
index 0000000..8fd6889
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResult.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.result;
+
+import org.apache.flink.graph.GraphAlgorithm;
+
+/**
+ * A {@link GraphAlgorithm} result for a single vertex.
+ */
+public interface UnaryResult<T> {
+
+	/**
+	 * Get the first vertex ID.
+	 *
+	 * @return first vertex ID
+	 */
+	T getVertexId0();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
index 3d1fcee..15c8359 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
-import org.apache.flink.types.CopyableValue;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -35,7 +34,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <VV> vertex value type
  * @param <EV> edge value type
  */
-public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+public class Simplify<K extends Comparable<K>, VV, EV>
 extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 
 	// Optional configuration

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
index c3d8983..a3c007e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
-import org.apache.flink.types.CopyableValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
@@ -37,7 +36,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <VV> vertex value type
  * @param <EV> edge value type
  */
-public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+public class Simplify<K extends Comparable<K>, VV, EV>
 extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 
 	// Required configuration

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
index a4996ab..dfa7eb2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
@@ -29,6 +29,7 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.LongValueSequenceIterator;
+import org.apache.flink.util.Preconditions;
 
 /*
  * @see <a href="http://mathworld.wolfram.com/CompleteGraph.html">Complete Graph at Wolfram MathWorld</a>
@@ -36,6 +37,8 @@ import org.apache.flink.util.LongValueSequenceIterator;
 public class CompleteGraph
 extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
+	public static final int MINIMUM_VERTEX_COUNT = 2;
+
 	// Required to create the DataSource
 	private final ExecutionEnvironment env;
 
@@ -49,9 +52,8 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	 * @param vertexCount number of vertices
 	 */
 	public CompleteGraph(ExecutionEnvironment env, long vertexCount) {
-		if (vertexCount <= 0) {
-			throw new IllegalArgumentException("Vertex count must be greater than zero");
-		}
+		Preconditions.checkArgument(vertexCount >= MINIMUM_VERTEX_COUNT,
+			"Vertex count must be at least " + MINIMUM_VERTEX_COUNT);
 
 		this.env = env;
 		this.vertexCount = vertexCount;

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
index ce8b467..b04d78c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Preconditions;
 
 /*
  * @see <a href="http://mathworld.wolfram.com/CycleGraph.html">Cycle Graph at Wolfram MathWorld</a>
@@ -29,6 +30,8 @@ import org.apache.flink.types.NullValue;
 public class CycleGraph
 extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
+	public static final int MINIMUM_VERTEX_COUNT = 2;
+
 	// Required to create the DataSource
 	private final ExecutionEnvironment env;
 
@@ -42,9 +45,8 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	 * @param vertexCount number of vertices
 	 */
 	public CycleGraph(ExecutionEnvironment env, long vertexCount) {
-		if (vertexCount <= 0) {
-			throw new IllegalArgumentException("Vertex count must be greater than zero");
-		}
+		Preconditions.checkArgument(vertexCount >= MINIMUM_VERTEX_COUNT,
+			"Vertex count must be at least " + MINIMUM_VERTEX_COUNT);
 
 		this.env = env;
 		this.vertexCount = vertexCount;

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
index 7ec368b..25584ea 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
@@ -29,6 +29,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Preconditions;
 
 import java.util.Collections;
 
@@ -38,6 +39,8 @@ import java.util.Collections;
 public class EmptyGraph
 extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
+	public static final int MINIMUM_VERTEX_COUNT = 1;
+
 	// Required to create the DataSource
 	private final ExecutionEnvironment env;
 
@@ -51,9 +54,8 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	 * @param vertexCount number of vertices
 	 */
 	public EmptyGraph(ExecutionEnvironment env, long vertexCount) {
-		if (vertexCount <= 0) {
-			throw new IllegalArgumentException("Vertex count must be greater than zero");
-		}
+		Preconditions.checkArgument(vertexCount >= MINIMUM_VERTEX_COUNT,
+			"Vertex count must be at least " + MINIMUM_VERTEX_COUNT);
 
 		this.env = env;
 		this.vertexCount = vertexCount;

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
index 74ea764..0ca804e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
@@ -30,6 +30,7 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.LongValueSequenceIterator;
+import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -67,9 +68,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	 * @return this
 	 */
 	public GridGraph addDimension(long size, boolean wrapEndpoints) {
-		if (size <= 1) {
-			throw new IllegalArgumentException("Dimension size must be greater than 1");
-		}
+		Preconditions.checkArgument(size >= 2, "Dimension size must be at least 2");
 
 		vertexCount *= size;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
index 40968a0..37590ff 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Preconditions;
 
 /*
  * @see <a href="http://mathworld.wolfram.com/HypercubeGraph.html">Hypercube Graph at Wolfram MathWorld</a>
@@ -29,6 +30,8 @@ import org.apache.flink.types.NullValue;
 public class HypercubeGraph
 extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
+	public static final int MINIMUM_DIMENSIONS = 1;
+
 	// Required to create the DataSource
 	private final ExecutionEnvironment env;
 
@@ -42,9 +45,8 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	 * @param dimensions number of dimensions
 	 */
 	public HypercubeGraph(ExecutionEnvironment env, long dimensions) {
-		if (dimensions <= 0) {
-			throw new IllegalArgumentException("Number of dimensions must be greater than zero");
-		}
+		Preconditions.checkArgument(dimensions >= MINIMUM_DIMENSIONS,
+			"Number of dimensions must be at least " + MINIMUM_DIMENSIONS);
 
 		this.env = env;
 		this.dimensions = dimensions;

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
index db5e6bf..dcc4c98 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Preconditions;
 
 /*
  * @see <a href="http://mathworld.wolfram.com/PathGraph.html">Path Graph at Wolfram MathWorld</a>
@@ -29,6 +30,8 @@ import org.apache.flink.types.NullValue;
 public class PathGraph
 extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
+	public static final int MINIMUM_VERTEX_COUNT = 2;
+
 	// Required to create the DataSource
 	private final ExecutionEnvironment env;
 
@@ -42,9 +45,8 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	 * @param vertexCount number of vertices
 	 */
 	public PathGraph(ExecutionEnvironment env, long vertexCount) {
-		if (vertexCount <= 0) {
-			throw new IllegalArgumentException("Vertex count must be greater than zero");
-		}
+		Preconditions.checkArgument(vertexCount >= MINIMUM_VERTEX_COUNT,
+			"Vertex count must be at least " + MINIMUM_VERTEX_COUNT);
 
 		this.env = env;
 		this.vertexCount = vertexCount;

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
index 2a80a37..95a4f85 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
@@ -30,6 +30,7 @@ import org.apache.flink.graph.generator.random.RandomGenerableFactory;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
 
 import java.util.List;
 
@@ -39,6 +40,10 @@ import java.util.List;
 public class RMatGraph<T extends RandomGenerator>
 extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
+	public static final int MINIMUM_VERTEX_COUNT = 1;
+
+	public static final int MINIMUM_EDGE_COUNT = 1;
+
 	// Default RMat constants
 	public static final float DEFAULT_A = 0.57f;
 
@@ -59,15 +64,15 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	private final long edgeCount;
 
 	// Optional configuration
-	private float A = DEFAULT_A;
+	public float A = DEFAULT_A;
 
-	private float B = DEFAULT_B;
+	public float B = DEFAULT_B;
 
-	private float C = DEFAULT_C;
+	public float C = DEFAULT_C;
 
 	private boolean noiseEnabled = false;
 
-	private float noise = DEFAULT_NOISE;
+	public float noise = DEFAULT_NOISE;
 
 	/**
 	 * Generate a directed or undirected power-law {@link Graph} using the
@@ -79,13 +84,11 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	 * @param edgeCount number of edges
 	 */
 	public RMatGraph(ExecutionEnvironment env, RandomGenerableFactory<T> randomGeneratorFactory, long vertexCount, long edgeCount) {
-		if (vertexCount <= 0) {
-			throw new IllegalArgumentException("Vertex count must be greater than zero");
-		}
+		Preconditions.checkArgument(vertexCount >= MINIMUM_VERTEX_COUNT,
+			"Vertex count must be at least " + MINIMUM_VERTEX_COUNT);
 
-		if (edgeCount <= 0) {
-			throw new IllegalArgumentException("Edge count must be greater than zero");
-		}
+		Preconditions.checkArgument(edgeCount >= MINIMUM_EDGE_COUNT,
+			"Edge count must be at least " + MINIMUM_EDGE_COUNT);
 
 		this.env = env;
 		this.randomGenerableFactory = randomGeneratorFactory;
@@ -106,9 +109,8 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	 * @return this
 	 */
 	public RMatGraph<T> setConstants(float A, float B, float C) {
-		if (A < 0.0f || B < 0.0f || C < 0.0f || A + B + C > 1.0f) {
-			throw new RuntimeException("RMat parameters A, B, and C must be non-negative and sum to less than or equal to one");
-		}
+		Preconditions.checkArgument(A >= 0.0f && B >= 0.0f && C >= 0.0f && A + B + C <= 1.0f,
+			"RMat parameters A, B, and C must be non-negative and sum to less than or equal to one");
 
 		this.A = A;
 		this.B = B;
@@ -128,9 +130,8 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	 * @return this
 	 */
 	public RMatGraph<T> setNoise(boolean noiseEnabled, float noise) {
-		if (noise < 0.0f || noise > 2.0f) {
-			throw new RuntimeException("RMat parameter noise must be non-negative and less than or equal to 2.0");
-		}
+		Preconditions.checkArgument(noise >= 0.0f && noise <= 2.0f,
+			"RMat parameter noise must be non-negative and less than or equal to 2.0");
 
 		this.noiseEnabled = noiseEnabled;
 		this.noise = noise;

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
index 2eef7ae..f3c087e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
@@ -28,6 +28,7 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.LongValueSequenceIterator;
+import org.apache.flink.util.Preconditions;
 
 /**
  * A singleton-edge {@link Graph} contains one or more isolated two-paths. The in- and out-degree
@@ -36,6 +37,8 @@ import org.apache.flink.util.LongValueSequenceIterator;
 public class SingletonEdgeGraph
 extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
+	public static final int MINIMUM_VERTEX_PAIR_COUNT = 1;
+
 	// Required to create the DataSource
 	private final ExecutionEnvironment env;
 
@@ -49,9 +52,8 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	 * @param vertexPairCount number of pairs of vertices
 	 */
 	public SingletonEdgeGraph(ExecutionEnvironment env, long vertexPairCount) {
-		if (vertexPairCount <= 0) {
-			throw new IllegalArgumentException("Vertex pair count must be greater than zero");
-		}
+		Preconditions.checkArgument(vertexPairCount >= MINIMUM_VERTEX_PAIR_COUNT,
+			"Vertex pair count must be at least " + MINIMUM_VERTEX_PAIR_COUNT);
 
 		this.env = env;
 		this.vertexPairCount = vertexPairCount;

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
index a47ae4d..6c7c433 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
@@ -29,6 +29,7 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.LongValueSequenceIterator;
+import org.apache.flink.util.Preconditions;
 
 /*
  * @see <a href="http://mathworld.wolfram.com/StarGraph.html">Star Graph at Wolfram MathWorld</a>
@@ -36,6 +37,8 @@ import org.apache.flink.util.LongValueSequenceIterator;
 public class StarGraph
 extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
+	public static final int MINIMUM_VERTEX_COUNT = 2;
+
 	// Required to create the DataSource
 	private final ExecutionEnvironment env;
 
@@ -49,9 +52,8 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	 * @param vertexCount number of vertices
 	 */
 	public StarGraph(ExecutionEnvironment env, long vertexCount) {
-		if (vertexCount <= 0) {
-			throw new IllegalArgumentException("Vertex count must be greater than zero");
-		}
+		Preconditions.checkArgument(vertexCount >= MINIMUM_VERTEX_COUNT,
+			"Vertex count must be at least " + MINIMUM_VERTEX_COUNT);
 
 		this.env = env;
 		this.vertexCount = vertexCount;

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
index c0a80d1..46bed68 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
@@ -24,8 +24,9 @@ import org.apache.flink.api.common.accumulators.DoubleCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.AbstractGraphAnalytic;
-import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.AnalyticHelper;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient.Result;
 import org.apache.flink.types.CopyableValue;
 
@@ -129,7 +130,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 	/**
 	 * Wraps global clustering coefficient metrics.
 	 */
-	public static class Result {
+	public static class Result
+	implements PrintableResult {
 		private long vertexCount;
 		private double averageLocalClusteringCoefficient;
 
@@ -164,7 +166,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		@Override
-		public String toString() {
+		public String toPrintableString() {
 			return "vertex count: " + vertexCount
 				+ ", average clustering coefficient: " + averageLocalClusteringCoefficient;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
index 9e0b203..20a6ec5 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
@@ -23,8 +23,9 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient.Result;
 import org.apache.flink.graph.asm.dataset.Count;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient.Result;
 import org.apache.flink.graph.library.metric.directed.VertexMetrics;
 import org.apache.flink.types.CopyableValue;
 
@@ -101,7 +102,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 	/**
 	 * Wraps global clustering coefficient metrics.
 	 */
-	public static class Result {
+	public static class Result
+	implements PrintableResult {
 		private long tripletCount;
 
 		private long triangleCount;
@@ -150,7 +152,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		@Override
-		public String toString() {
+		public String toPrintableString() {
 			return "triplet count: " + tripletCount
 				+ ", triangle count: " + triangleCount
 				+ ", global clustering coefficient: " + getGlobalClusteringCoefficientScore();

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index ffd4b13..a973a2d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -23,16 +23,21 @@ import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.asm.result.UnaryResult;
 import org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
-import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
@@ -207,7 +212,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 *
 	 * @param <T> ID type
 	 */
-	@FunctionAnnotation.ForwardedFields("0")
+	@ForwardedFields("0")
 	private static class CountTriangles<T>
 	implements ReduceFunction<Tuple2<T, LongValue>> {
 		@Override
@@ -223,8 +228,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 *
 	 * @param <T> ID type
 	 */
-	@FunctionAnnotation.ForwardedFieldsFirst("0; 1.0->1.0")
-	@FunctionAnnotation.ForwardedFieldsSecond("0")
+	@ForwardedFieldsFirst("0; 1.0->1")
+	@ForwardedFieldsSecond("0")
 	private static class JoinVertexDegreeWithTriangleCount<T>
 	implements JoinFunction<Vertex<T, Degrees>, Tuple2<T, LongValue>, Result<T>> {
 		private LongValue zero = new LongValue(0);
@@ -235,29 +240,28 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		public Result<T> join(Vertex<T, Degrees> vertexAndDegree, Tuple2<T, LongValue> vertexAndTriangleCount)
 				throws Exception {
 			output.f0 = vertexAndDegree.f0;
-			output.f1.f0 = vertexAndDegree.f1.f0;
-			output.f1.f1 = (vertexAndTriangleCount == null) ? zero : vertexAndTriangleCount.f1;
+			output.f1 = vertexAndDegree.f1.f0;
+			output.f2 = (vertexAndTriangleCount == null) ? zero : vertexAndTriangleCount.f1;
 
 			return output;
 		}
 	}
 
 	/**
-	 * Wraps the vertex type to encapsulate results from the Local Clustering Coefficient algorithm.
+	 * Wraps {@link Tuple3} to encapsulate results from the Local Clustering Coefficient algorithm.
 	 *
 	 * @param <T> ID type
 	 */
 	public static class Result<T>
-	extends Vertex<T, Tuple2<LongValue, LongValue>> {
+	extends Tuple3<T, LongValue, LongValue>
+	implements PrintableResult, UnaryResult<T> {
 		public static final int HASH_SEED = 0x37a208c4;
 
 		private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
 
-		/**
-		 * No-args constructor.
-		 */
-		public Result() {
-			f1 = new Tuple2<>();
+		@Override
+		public T getVertexId0() {
+			return f0;
 		}
 
 		/**
@@ -266,7 +270,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		 * @return vertex degree
 		 */
 		public LongValue getDegree() {
-			return f1.f0;
+			return f1;
 		}
 
 		/**
@@ -276,7 +280,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		 * @return triangle count
 		 */
 		public LongValue getTriangleCount() {
-			return f1.f1;
+			return f2;
 		}
 
 		/**
@@ -301,8 +305,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		 *
 		 * @return verbose string
 		 */
-		public String toVerboseString() {
-			return "Vertex ID: " + f0
+		public String toPrintableString() {
+			return "Vertex ID: " + getVertexId0()
 				+ ", vertex degree: " + getDegree()
 				+ ", triangle count: " + getTriangleCount()
 				+ ", local clustering coefficient: " + getLocalClusteringCoefficientScore();
@@ -312,8 +316,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		public int hashCode() {
 			return hasher.reset()
 				.hash(f0.hashCode())
-				.hash(f1.f0.getValue())
-				.hash(f1.f1.getValue())
+				.hash(f1.getValue())
+				.hash(f2.getValue())
 				.hash();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
index 2274e3e..5f28605 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
@@ -27,6 +27,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.clustering.directed.TriadicCensus.Result;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.util.Preconditions;
@@ -321,7 +322,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 	/**
 	 * Wraps triadic census metrics.
 	 */
-	public static class Result {
+	public static class Result
+	implements PrintableResult {
 		private final BigInteger[] counts;
 
 		public Result(BigInteger... counts) {
@@ -518,7 +520,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		@Override
-		public String toString() {
+		public String toPrintableString() {
 			NumberFormat nf = NumberFormat.getInstance();
 
 			return "003: " + nf.format(getCount003())

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index 6b3e2a1..236272f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -36,6 +36,8 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.asm.result.TertiaryResult;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.ByteValue;
@@ -69,7 +71,7 @@ public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV>
 extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 	// Optional configuration
-	private OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, false);
+	private OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, true);
 
 	private int littleParallelism = PARALLELISM_DEFAULT;
 
@@ -351,7 +353,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	@ForwardedFieldsSecond("0; 1")
 	private static final class ProjectTriangles<T>
 	implements JoinFunction<Tuple4<T, T, T, ByteValue>, Tuple3<T, T, ByteValue>, Result<T>> {
-		private Result<T> output = new Result<>(null, null, null, new ByteValue());
+		private Result<T> output = new Result<>();
 
 		@Override
 		public Result<T> join(Tuple4<T, T, T, ByteValue> triplet, Tuple3<T, T, ByteValue> edge)
@@ -407,27 +409,45 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	}
 
 	/**
-	 * Wraps the vertex type to encapsulate results from the Triangle Listing algorithm.
+	 * Wraps {@link Tuple4} to encapsulate results from the directed Triangle Listing algorithm.
 	 *
 	 * @param <T> ID type
 	 */
 	public static class Result<T>
-	extends Tuple4<T, T, T, ByteValue> {
+	extends Tuple4<T, T, T, ByteValue>
+	implements PrintableResult, TertiaryResult<T> {
 		/**
 		 * No-args constructor.
 		 */
-		public Result() {}
+		public Result() {
+			f3 = new ByteValue();
+		}
+
+		@Override
+		public T getVertexId0() {
+			return f0;
+		}
+
+		@Override
+		public T getVertexId1() {
+			return f1;
+		}
+
+		@Override
+		public T getVertexId2() {
+			return f2;
+		}
 
 		/**
-		 * Populates parent tuple with constructor parameters.
+		 * Get the bitmask indicating the presence of the six potential
+		 * connecting edges.
+		 *
+		 * @return the edge bitmask
 		 *
-		 * @param value0 1st triangle vertex ID
-		 * @param value1 2nd triangle vertex ID
-		 * @param value2 3rd triangle vertex ID
-		 * @param value3 bitmask indicating presence of six possible edges between triangle vertices
+		 * @see EdgeOrder
 		 */
-		public Result(T value0, T value1, T value2, ByteValue value3) {
-			super(value0, value1, value2, value3);
+		public ByteValue getBitmask() {
+			return f3;
 		}
 
 		/**
@@ -435,15 +455,15 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		 *
 		 * @return verbose string
 		 */
-		public String toVerboseString() {
+		public String toPrintableString() {
 			byte bitmask = f3.getValue();
 
-			return "1st vertex ID: " + f0
-				+ ", 2nd vertex ID: " + f1
-				+ ", 3rd vertex ID: " + f2
-				+ ", edge directions: " + f0 + maskToString(bitmask, 4) + f1
-				+ ", " + f0 + maskToString(bitmask, 2) + f2
-				+ ", " + f1 + maskToString(bitmask, 0) + f2;
+			return "1st vertex ID: " + getVertexId0()
+				+ ", 2nd vertex ID: " + getVertexId1()
+				+ ", 3rd vertex ID: " + getVertexId2()
+				+ ", edge directions: " + getVertexId0() + maskToString(bitmask, 4) + getVertexId1()
+				+ ", " + getVertexId0() + maskToString(bitmask, 2) + getVertexId2()
+				+ ", " + getVertexId1() + maskToString(bitmask, 0) + getVertexId2();
 		}
 
 		private String maskToString(byte mask, int shift) {

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
index 3d4a88e..e01892b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
@@ -24,8 +24,9 @@ import org.apache.flink.api.common.accumulators.DoubleCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.AbstractGraphAnalytic;
-import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.AnalyticHelper;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient.Result;
 import org.apache.flink.types.CopyableValue;
 
@@ -129,7 +130,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 	/**
 	 * Wraps global clustering coefficient metrics.
 	 */
-	public static class Result {
+	public static class Result
+	implements PrintableResult {
 		private long vertexCount;
 		private double averageLocalClusteringCoefficient;
 
@@ -164,7 +166,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		@Override
-		public String toString() {
+		public String toPrintableString() {
 			return "vertex count: " + vertexCount
 				+ ", average clustering coefficient: " + averageLocalClusteringCoefficient;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
index b24155b..2eac620 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
@@ -21,11 +21,11 @@ package org.apache.flink.graph.library.clustering.undirected;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient.Result;
 import org.apache.flink.graph.asm.dataset.Count;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient.Result;
 import org.apache.flink.graph.library.metric.undirected.VertexMetrics;
 import org.apache.flink.types.CopyableValue;
 
@@ -42,7 +42,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 public class GlobalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
 extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
-	private Count<Tuple3<K, K, K>> triangleCount;
+	private Count<TriangleListing.Result<K>> triangleCount;
 
 	private VertexMetrics<K, VV, EV> vertexMetrics;
 
@@ -75,7 +75,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 		triangleCount = new Count<>();
 
-		DataSet<Tuple3<K, K, K>> triangles = input
+		DataSet<TriangleListing.Result<K>> triangles = input
 			.run(new TriangleListing<K, VV, EV>()
 				.setSortTriangleVertices(false)
 				.setLittleParallelism(littleParallelism));
@@ -101,7 +101,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 	/**
 	 * Wraps global clustering coefficient metrics.
 	 */
-	public static class Result {
+	public static class Result
+	implements PrintableResult {
 		private long tripletCount;
 
 		private long triangleCount;
@@ -150,7 +151,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		@Override
-		public String toString() {
+		public String toPrintableString() {
 			return "triplet count: " + tripletCount
 				+ ", triangle count: " + triangleCount
 				+ ", global clustering coefficient: " + getGlobalClusteringCoefficientScore();

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 31ddf45..9aca8a4 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -23,16 +23,20 @@ import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
+import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
+import org.apache.flink.graph.asm.result.UnaryResult;
 import org.apache.flink.graph.utils.Murmur3_32;
-import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
@@ -138,7 +142,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// u, v, w
-		DataSet<Tuple3<K, K, K>> triangles = input
+		DataSet<TriangleListing.Result<K>> triangles = input
 			.run(new TriangleListing<K, VV, EV>()
 				.setLittleParallelism(littleParallelism));
 
@@ -176,11 +180,11 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 * @param <T> ID type
 	 */
 	private static class SplitTriangles<T>
-	implements FlatMapFunction<Tuple3<T, T, T>, Tuple2<T, LongValue>> {
+	implements FlatMapFunction<TriangleListing.Result<T>, Tuple2<T, LongValue>> {
 		private Tuple2<T, LongValue> output = new Tuple2<>(null, new LongValue(1));
 
 		@Override
-		public void flatMap(Tuple3<T, T, T> value, Collector<Tuple2<T, LongValue>> out)
+		public void flatMap(TriangleListing.Result<T> value, Collector<Tuple2<T, LongValue>> out)
 				throws Exception {
 			output.f0 = value.f0;
 			out.collect(output);
@@ -198,7 +202,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 *
 	 * @param <T> ID type
 	 */
-	@FunctionAnnotation.ForwardedFields("0")
+	@ForwardedFields("0")
 	private static class CountTriangles<T>
 	implements ReduceFunction<Tuple2<T, LongValue>> {
 		@Override
@@ -214,8 +218,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 *
 	 * @param <T> ID type
 	 */
-	@FunctionAnnotation.ForwardedFieldsFirst("0; 1->1.0")
-	@FunctionAnnotation.ForwardedFieldsSecond("0")
+	@ForwardedFieldsFirst("0; 1")
+	@ForwardedFieldsSecond("0")
 	private static class JoinVertexDegreeWithTriangleCount<T>
 	implements JoinFunction<Vertex<T, LongValue>, Tuple2<T, LongValue>, Result<T>> {
 		private LongValue zero = new LongValue(0);
@@ -226,26 +230,28 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		public Result<T> join(Vertex<T, LongValue> vertexAndDegree, Tuple2<T, LongValue> vertexAndTriangleCount)
 				throws Exception {
 			output.f0 = vertexAndDegree.f0;
-			output.f1.f0 = vertexAndDegree.f1;
-			output.f1.f1 = (vertexAndTriangleCount == null) ? zero : vertexAndTriangleCount.f1;
+			output.f1 = vertexAndDegree.f1;
+			output.f2 = (vertexAndTriangleCount == null) ? zero : vertexAndTriangleCount.f1;
 
 			return output;
 		}
 	}
 
 	/**
-	 * Wraps the vertex type to encapsulate results from the Local Clustering Coefficient algorithm.
+	 * Wraps {@link Tuple3} to encapsulate results from the Local Clustering Coefficient algorithm.
 	 *
 	 * @param <T> ID type
 	 */
 	public static class Result<T>
-	extends Vertex<T, Tuple2<LongValue, LongValue>> {
+	extends Tuple3<T, LongValue, LongValue>
+	implements PrintableResult, UnaryResult<T> {
 		private static final int HASH_SEED = 0xc23937c1;
 
 		private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
 
-		public Result() {
-			f1 = new Tuple2<>();
+		@Override
+		public T getVertexId0() {
+			return f0;
 		}
 
 		/**
@@ -254,7 +260,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		 * @return vertex degree
 		 */
 		public LongValue getDegree() {
-			return f1.f0;
+			return f1;
 		}
 
 		/**
@@ -264,7 +270,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		 * @return triangle count
 		 */
 		public LongValue getTriangleCount() {
-			return f1.f1;
+			return f2;
 		}
 
 		/**
@@ -289,8 +295,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		 *
 		 * @return verbose string
 		 */
-		public String toVerboseString() {
-			return "Vertex ID: " + f0
+		public String toPrintableString() {
+			return "Vertex ID: " + getVertexId0()
 				+ ", vertex degree: " + getDegree()
 				+ ", triangle count: " + getTriangleCount()
 				+ ", local clustering coefficient: " + getLocalClusteringCoefficientScore();
@@ -300,8 +306,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		public int hashCode() {
 			return hasher.reset()
 				.hash(f0.hashCode())
-				.hash(f1.f0.getValue())
-				.hash(f1.f1.getValue())
+				.hash(f1.getValue())
+				.hash(f2.getValue())
 				.hash();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
index 7482af0..604621d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.dataset.Count;
-import org.apache.flink.graph.library.clustering.directed.TriangleListing;
+import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.clustering.undirected.TriadicCensus.Result;
 import org.apache.flink.graph.library.metric.undirected.VertexMetrics;
 import org.apache.flink.types.CopyableValue;
@@ -140,7 +140,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 	/**
 	 * Wraps triadic census metrics.
 	 */
-	public static class Result {
+	public static class Result
+	implements PrintableResult {
 		private final BigInteger[] counts;
 
 		public Result(BigInteger... counts) {
@@ -212,7 +213,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		@Override
-		public String toString() {
+		public String toPrintableString() {
 			NumberFormat nf = NumberFormat.getInstance();
 
 			return "03: " + nf.format(getCount03())

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index 09b9a5d..e72c4cd 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -33,6 +33,9 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair;
+import org.apache.flink.graph.library.clustering.undirected.TriangleListing.Result;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.asm.result.TertiaryResult;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.CopyableValue;
@@ -64,10 +67,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, K, K>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 	// Optional configuration
-	private OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, false);
+	private OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, true);
 
 	private int littleParallelism = PARALLELISM_DEFAULT;
 
@@ -132,7 +135,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, K, K>> {
 	 */
 
 	@Override
-	public DataSet<Tuple3<K, K, K>> runInternal(Graph<K, VV, EV> input)
+	public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// u, v where u < v
 		DataSet<Tuple2<K, K>> filteredByID = input
@@ -160,7 +163,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, K, K>> {
 				.name("Generate triplets");
 
 		// u, v, w where (u, v), (u, w), and (v, w) are edges in graph, v < w
-		DataSet<Tuple3<K, K, K>> triangles = triplets
+		DataSet<Result<K>> triangles = triplets
 			.join(filteredByID, JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND)
 			.where(1, 2)
 			.equalTo(0, 1)
@@ -290,11 +293,16 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, K, K>> {
 	@ForwardedFieldsFirst("0; 1; 2")
 	@ForwardedFieldsSecond("0; 1")
 	private static final class ProjectTriangles<T>
-	implements JoinFunction<Tuple3<T, T, T>, Tuple2<T, T>, Tuple3<T, T, T>> {
+	implements JoinFunction<Tuple3<T, T, T>, Tuple2<T, T>, Result<T>> {
+		private Result<T> output = new Result<>();
+
 		@Override
-		public Tuple3<T, T, T> join(Tuple3<T, T, T> triplet, Tuple2<T, T> edge)
+		public Result<T> join(Tuple3<T, T, T> triplet, Tuple2<T, T> edge)
 				throws Exception {
-			return triplet;
+			output.f0 = triplet.f0;
+			output.f1 = triplet.f1;
+			output.f2 = triplet.f2;
+			return output;
 		}
 	}
 
@@ -305,9 +313,9 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, K, K>> {
 	 * @param <T> ID type
 	 */
 	private static final class SortTriangleVertices<T extends Comparable<T>>
-	implements MapFunction<Tuple3<T, T, T>, Tuple3<T, T, T>> {
+	implements MapFunction<Result<T>, Result<T>> {
 		@Override
-		public Tuple3<T, T, T> map(Tuple3<T, T, T> value)
+		public Result<T> map(Result<T> value)
 				throws Exception {
 			// by the triangle listing algorithm we know f1 < f2
 			if (value.f0.compareTo(value.f1) > 0) {
@@ -325,4 +333,39 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, K, K>> {
 			return value;
 		}
 	}
+
+	/**
+	 * Wraps {@link Tuple3} to encapsulate results from the undirected Triangle Listing algorithm.
+	 *
+	 * @param <T> ID type
+	 */
+	public static class Result<T>
+	extends Tuple3<T, T, T>
+	implements PrintableResult, TertiaryResult<T> {
+		@Override
+		public T getVertexId0() {
+			return f0;
+		}
+
+		@Override
+		public T getVertexId1() {
+			return f1;
+		}
+
+		@Override
+		public T getVertexId2() {
+			return f2;
+		}
+
+		/**
+		 * Format values into a human-readable string.
+		 *
+		 * @return verbose string
+		 */
+		public String toPrintableString() {
+			return "1st vertex ID: " + getVertexId0()
+				+ ", 2nd vertex ID: " + getVertexId1()
+				+ ", 3rd vertex ID: " + getVertexId2();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
index f4195f7..216cf50 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
@@ -36,7 +36,8 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.asm.result.UnaryResult;
 import org.apache.flink.graph.library.link_analysis.Functions.SumScore;
 import org.apache.flink.graph.library.link_analysis.HITS.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
@@ -511,7 +512,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 *
 	 * @param <T> ID type
 	 */
-	@ForwardedFields("0")
+	@ForwardedFields("0; 1; 2")
 	private static class TranslateResult<T>
 	implements MapFunction<Tuple3<T, DoubleValue, DoubleValue>, Result<T>> {
 		private Result<T> output = new Result<>();
@@ -519,25 +520,27 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		@Override
 		public Result<T> map(Tuple3<T, DoubleValue, DoubleValue> value) throws Exception {
 			output.f0 = value.f0;
-			output.f1.f0 = value.f1;
-			output.f1.f1 = value.f2;
+			output.f1 = value.f1;
+			output.f2 = value.f2;
 			return output;
 		}
 	}
 
 	/**
-	 * Wraps the vertex type to encapsulate results from the HITS algorithm.
+	 * Wraps the {@link Tuple3} to encapsulate results from the HITS algorithm.
 	 *
 	 * @param <T> ID type
 	 */
 	public static class Result<T>
-	extends Vertex<T, Tuple2<DoubleValue, DoubleValue>> {
+	extends Tuple3<T, DoubleValue, DoubleValue>
+	implements PrintableResult, UnaryResult<T> {
 		public static final int HASH_SEED = 0xc7e39a63;
 
 		private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
 
-		public Result() {
-			f1 = new Tuple2<>();
+		@Override
+		public T getVertexId0() {
+			return f0;
 		}
 
 		/**
@@ -546,7 +549,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		 * @return the hub score
 		 */
 		public DoubleValue getHubScore() {
-			return f1.f0;
+			return f1;
 		}
 
 		/**
@@ -555,11 +558,11 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		 * @return the authority score
 		 */
 		public DoubleValue getAuthorityScore() {
-			return f1.f1;
+			return f2;
 		}
 
-		public String toVerboseString() {
-			return "Vertex ID: " + f0
+		public String toPrintableString() {
+			return "Vertex ID: " + getVertexId0()
 				+ ", hub score: " + getHubScore()
 				+ ", authority score: " + getAuthorityScore();
 		}
@@ -568,8 +571,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		public int hashCode() {
 			return hasher.reset()
 				.hash(f0.hashCode())
-				.hash(f1.f0.getValue())
-				.hash(f1.f1.getValue())
+				.hash(f1.getValue())
+				.hash(f2.getValue())
 				.hash();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
index 514fd4e..57743e8 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
@@ -211,7 +211,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				.name("Send score")
 			.groupBy(0)
 			.reduce(new SumScore<K>())
-				.setCombineHint(CombineHint.HASH)
+			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
index fba72ed..82cc607 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -37,6 +37,7 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.metric.directed.EdgeMetrics.Result;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
@@ -272,7 +273,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 	/**
 	 * Wraps edge metrics.
 	 */
-	public static class Result {
+	public static class Result
+	implements PrintableResult {
 		private long triangleTripletCount;
 		private long rectangleTripletCount;
 		private long maximumTriangleTriplets;
@@ -323,7 +325,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		@Override
-		public String toString() {
+		public String toPrintableString() {
 			NumberFormat nf = NumberFormat.getInstance();
 
 			return "triangle triplet count: " + nf.format(triangleTripletCount)

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
index 231631b..9764f6b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
@@ -29,6 +29,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.metric.directed.VertexMetrics.Result;
 
 import java.io.IOException;
@@ -192,7 +193,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 	/**
 	 * Wraps vertex metrics.
 	 */
-	public static class Result {
+	public static class Result
+	implements PrintableResult {
 		private long vertexCount;
 		private long unidirectionalEdgeCount;
 		private long bidirectionalEdgeCount;
@@ -258,8 +260,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		 *
 		 * @return average degree
 		 */
-		public float getAverageDegree() {
-			return vertexCount == 0 ? Float.NaN : getNumberOfEdges() / (float)vertexCount;
+		public double getAverageDegree() {
+			return vertexCount == 0 ? Double.NaN : getNumberOfEdges() / (double)vertexCount;
 		}
 
 		/**
@@ -270,8 +272,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		 *
 		 * @return density
 		 */
-		public float getDensity() {
-			return vertexCount <= 1 ? Float.NaN : getNumberOfEdges() / (float)(vertexCount*(vertexCount-1));
+		public double getDensity() {
+			return vertexCount <= 1 ? Double.NaN : getNumberOfEdges() / (double)(vertexCount*(vertexCount-1));
 		}
 
 		/**
@@ -320,15 +322,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		@Override
-		public String toString() {
+		public String toPrintableString() {
 			NumberFormat nf = NumberFormat.getInstance();
 
+			// format for very small fractional numbers
+			NumberFormat ff = NumberFormat.getInstance();
+			ff.setMaximumFractionDigits(8);
+
 			return "vertex count: " + nf.format(vertexCount)
 				+ "; edge count: " + nf.format(getNumberOfEdges())
 				+ "; unidirectional edge count: " + nf.format(unidirectionalEdgeCount)
 				+ "; bidirectional edge count: " + nf.format(bidirectionalEdgeCount)
 				+ "; average degree: " + nf.format(getAverageDegree())
-				+ "; density: " + nf.format(getDensity())
+				+ "; density: " + ff.format(getDensity())
 				+ "; triplet count: " + nf.format(tripletCount)
 				+ "; maximum degree: " + nf.format(maximumDegree)
 				+ "; maximum out degree: " + nf.format(maximumOutDegree)

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
index af4a57f..31f01d8 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -33,6 +33,7 @@ import org.apache.flink.graph.AnalyticHelper;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair;
+import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.metric.undirected.EdgeMetrics.Result;
 import org.apache.flink.types.LongValue;
 
@@ -245,7 +246,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 	/**
 	 * Wraps edge metrics.
 	 */
-	public static class Result {
+	public static class Result
+	implements PrintableResult {
 		private long triangleTripletCount;
 		private long rectangleTripletCount;
 		private long maximumTriangleTriplets;
@@ -296,7 +298,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		@Override
-		public String toString() {
+		public String toPrintableString() {
 			NumberFormat nf = NumberFormat.getInstance();
 
 			return "triangle triplet count: " + nf.format(triangleTripletCount)

http://git-wip-us.apache.org/repos/asf/flink/blob/33cd9795/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
index 0fd1428..dd2411e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -28,6 +28,7 @@ import org.apache.flink.graph.AnalyticHelper;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
+import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.metric.undirected.VertexMetrics.Result;
 import org.apache.flink.types.LongValue;
 
@@ -183,7 +184,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 	/**
 	 * Wraps vertex metrics.
 	 */
-	public static class Result {
+	public static class Result
+	implements PrintableResult {
 		private long vertexCount;
 		private long edgeCount;
 		private long tripletCount;
@@ -225,9 +227,9 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		 *
 		 * @return average degree
 		 */
-		public float getAverageDegree() {
+		public double getAverageDegree() {
 			// each edge is incident on two vertices
-			return vertexCount == 0 ? Float.NaN : 2 * edgeCount / (float)vertexCount;
+			return vertexCount == 0 ? Double.NaN : 2 * edgeCount / (double)vertexCount;
 		}
 
 		/**
@@ -238,8 +240,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		 *
 		 * @return density
 		 */
-		public float getDensity() {
-			return vertexCount <= 1 ? Float.NaN : edgeCount / (float)(vertexCount*(vertexCount-1)/2);
+		public double getDensity() {
+			return vertexCount <= 1 ? Double.NaN : edgeCount / (double)(vertexCount*(vertexCount-1)/2);
 		}
 
 		/**
@@ -270,13 +272,17 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		@Override
-		public String toString() {
+		public String toPrintableString() {
 			NumberFormat nf = NumberFormat.getInstance();
 
+			// format for very small fractional numbers
+			NumberFormat ff = NumberFormat.getInstance();
+			ff.setMaximumFractionDigits(8);
+
 			return "vertex count: " + nf.format(vertexCount)
 				+ "; edge count: " + nf.format(edgeCount)
 				+ "; average degree: " + nf.format(getAverageDegree())
-				+ "; density: " + nf.format(getDensity())
+				+ "; density: " + ff.format(getDensity())
 				+ "; triplet count: " + nf.format(tripletCount)
 				+ "; maximum degree: " + nf.format(maximumDegree)
 				+ "; maximum triplets: " + nf.format(maximumTriplets);