You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/12/14 21:11:36 UTC

flink git commit: [FLINK-4936] [gelly] Operator names for Gelly inputs

Repository: flink
Updated Branches:
  refs/heads/master 21d1d8b49 -> 09e081730


[FLINK-4936] [gelly] Operator names for Gelly inputs

Provide descriptive operator names for Graph and GraphCsvReader.
Condense multiple type conversion maps into a single mapper.
Reuse objects in operations wrapping user-defined-functions.

This closes #2832


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09e08173
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09e08173
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09e08173

Branch: refs/heads/master
Commit: 09e0817306e8c077aeea96252a7c98fbd0f9747b
Parents: 21d1d8b
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Oct 26 15:18:50 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Dec 14 15:03:45 2016 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 408 +++++++++++--------
 .../org/apache/flink/graph/GraphCsvReader.java  |  67 ++-
 .../graph/generator/AbstractGraphGenerator.java |   4 +-
 .../flink/graph/utils/Tuple2ToEdgeMap.java      |  48 +++
 4 files changed, 333 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09e08173/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 7854c54..c6843e4 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -55,6 +55,7 @@ import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
 import org.apache.flink.graph.spargel.ScatterGatherIteration;
 import org.apache.flink.graph.utils.EdgeToTuple3Map;
+import org.apache.flink.graph.utils.Tuple2ToEdgeMap;
 import org.apache.flink.graph.utils.Tuple2ToVertexMap;
 import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 import org.apache.flink.graph.utils.VertexToTuple2Map;
@@ -74,8 +75,7 @@ import java.util.NoSuchElementException;
 /**
  * Represents a Graph consisting of {@link Edge edges} and {@link Vertex
  * vertices}.
- * 
- * 
+ *
  * @see org.apache.flink.graph.Edge
  * @see org.apache.flink.graph.Vertex
  * 
@@ -176,17 +176,24 @@ public class Graph<K, VV, EV> {
 	public static <K, EV> Graph<K, NullValue, EV> fromDataSet(
 			DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
 
-		DataSet<Vertex<K, NullValue>> vertices = edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct();
+		DataSet<Vertex<K, NullValue>> vertices = edges
+			.flatMap(new EmitSrcAndTarget<K, EV>())
+				.name("Source and target IDs")
+			.distinct()
+				.name("IDs");
 
 		return new Graph<>(vertices, edges, context);
 	}
 
-	private static final class EmitSrcAndTarget<K, EV> implements FlatMapFunction<
-			Edge<K, EV>, Vertex<K, NullValue>> {
+	private static final class EmitSrcAndTarget<K, EV>
+	implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> {
+		private Vertex<K, NullValue> output = new Vertex<>(null, NullValue.getInstance());
 
 		public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, NullValue>> out) {
-			out.collect(new Vertex<>(edge.f0, NullValue.getInstance()));
-			out.collect(new Vertex<>(edge.f1, NullValue.getInstance()));
+			output.f0 = edge.f0;
+			out.collect(output);
+			output.f0 = edge.f1;
+			out.collect(output);
 		}
 	}
 
@@ -214,22 +221,32 @@ public class Graph<K, VV, EV> {
 				Vertex.class, keyType, valueType);
 
 		DataSet<Vertex<K, VV>> vertices = edges
-				.flatMap(new EmitSrcAndTargetAsTuple1<K, EV>()).distinct()
-				.map(new MapFunction<Tuple1<K>, Vertex<K, VV>>() {
-					public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
-						return new Vertex<>(value.f0, vertexValueInitializer.map(value.f0));
-					}
-				}).returns(returnType).withForwardedFields("f0");
+			.flatMap(new EmitSrcAndTargetAsTuple1<K, EV>())
+				.name("Source and target IDs")
+			.distinct()
+				.name("IDs")
+			.map(new MapFunction<Tuple1<K>, Vertex<K, VV>>() {
+				private Vertex<K, VV> output = new Vertex<>();
+
+				public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
+					output.f0 = value.f0;
+					output.f1 = vertexValueInitializer.map(value.f0);
+					return output;
+				}
+			}).returns(returnType).withForwardedFields("f0").name("Initialize vertex values");
 
 		return new Graph<>(vertices, edges, context);
 	}
 
-	private static final class EmitSrcAndTargetAsTuple1<K, EV> implements FlatMapFunction<
-		Edge<K, EV>, Tuple1<K>> {
+	private static final class EmitSrcAndTargetAsTuple1<K, EV>
+	implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
+		private Tuple1<K> output = new Tuple1<>();
 
 		public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
-			out.collect(new Tuple1<>(edge.f0));
-			out.collect(new Tuple1<>(edge.f1));
+			output.f0 = edge.f0;
+			out.collect(output);
+			output.f0 = edge.f1;
+			out.collect(output);
 		}
 	}
 
@@ -251,8 +268,14 @@ public class Graph<K, VV, EV> {
 	public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple2<K, VV>> vertices,
 			DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment context) {
 
-		DataSet<Vertex<K, VV>> vertexDataSet = vertices.map(new Tuple2ToVertexMap<K, VV>());
-		DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
+		DataSet<Vertex<K, VV>> vertexDataSet = vertices
+			.map(new Tuple2ToVertexMap<K, VV>())
+				.name("Type conversion");
+
+		DataSet<Edge<K, EV>> edgeDataSet = edges
+			.map(new Tuple3ToEdgeMap<K, EV>())
+				.name("Type conversion");
+
 		return fromDataSet(vertexDataSet, edgeDataSet, context);
 	}
 
@@ -272,7 +295,10 @@ public class Graph<K, VV, EV> {
 	public static <K, EV> Graph<K, NullValue, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges,
 			ExecutionEnvironment context) {
 
-		DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
+		DataSet<Edge<K, EV>> edgeDataSet = edges
+			.map(new Tuple3ToEdgeMap<K, EV>())
+				.name("Type conversion");
+
 		return fromDataSet(edgeDataSet, context);
 	}
 
@@ -295,7 +321,10 @@ public class Graph<K, VV, EV> {
 	public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges,
 			final MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
 
-		DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
+		DataSet<Edge<K, EV>> edgeDataSet = edges
+			.map(new Tuple3ToEdgeMap<K, EV>())
+				.name("Type conversion");
+
 		return fromDataSet(edgeDataSet, vertexValueInitializer, context);
 	}
 
@@ -313,13 +342,10 @@ public class Graph<K, VV, EV> {
 	public static <K> Graph<K, NullValue, NullValue> fromTuple2DataSet(DataSet<Tuple2<K, K>> edges,
 			ExecutionEnvironment context) {
 
-		DataSet<Edge<K, NullValue>> edgeDataSet = edges.map(
-				new MapFunction<Tuple2<K, K>, Edge<K, NullValue>>() {
+		DataSet<Edge<K, NullValue>> edgeDataSet = edges
+			.map(new Tuple2ToEdgeMap<K>())
+				.name("To Edge");
 
-					public Edge<K, NullValue> map(Tuple2<K, K> input) {
-						return new Edge<>(input.f0, input.f1, NullValue.getInstance());
-					}
-		}).withForwardedFields("f0; f1");
 		return fromDataSet(edgeDataSet, context);
 	}
 
@@ -341,13 +367,10 @@ public class Graph<K, VV, EV> {
 	public static <K, VV> Graph<K, VV, NullValue> fromTuple2DataSet(DataSet<Tuple2<K, K>> edges,
 			final MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
 
-		DataSet<Edge<K, NullValue>> edgeDataSet = edges.map(
-				new MapFunction<Tuple2<K, K>, Edge<K, NullValue>>() {
+		DataSet<Edge<K, NullValue>> edgeDataSet = edges
+			.map(new Tuple2ToEdgeMap<K>())
+				.name("To Edge");
 
-					public Edge<K, NullValue> map(Tuple2<K, K> input) {
-						return new Edge<>(input.f0, input.f1, NullValue.getInstance());
-					}
-				}).withForwardedFields("f0; f1");
 		return fromDataSet(edgeDataSet, vertexValueInitializer, context);
 	}
 
@@ -458,10 +481,13 @@ public class Graph<K, VV, EV> {
 	 * @return a triplet DataSet consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue)
 	 */
 	public DataSet<Triplet<K, VV, EV>> getTriplets() {
-		return this.getVertices().join(this.getEdges()).where(0).equalTo(0)
-				.with(new ProjectEdgeWithSrcValue<K, VV, EV>())
-				.join(this.getVertices()).where(1).equalTo(0)
-				.with(new ProjectEdgeWithVertexValues<K, VV, EV>());
+		return this.getVertices()
+			.join(this.getEdges()).where(0).equalTo(0)
+			.with(new ProjectEdgeWithSrcValue<K, VV, EV>())
+				.name("Project edge with source value")
+			.join(this.getVertices()).where(1).equalTo(0)
+			.with(new ProjectEdgeWithVertexValues<K, VV, EV>())
+				.name("Project edge with vertex values");
 	}
 
 	@ForwardedFieldsFirst("f1->f2")
@@ -521,12 +547,17 @@ public class Graph<K, VV, EV> {
 	public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper, TypeInformation<Vertex<K,NV>> returnType) {
 		DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
 				new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() {
+					private Vertex<K, NV> output = new Vertex<>();
+
 					public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
-						return new Vertex<>(value.f0, mapper.map(value));
+						output.f0 = value.f0;
+						output.f1 = mapper.map(value);
+						return output;
 					}
 				})
 				.returns(returnType)
-				.withForwardedFields("f0");
+				.withForwardedFields("f0")
+					.name("Map vertices");
 
 		return new Graph<>(mappedVertices, this.edges, this.context);
 	}
@@ -551,6 +582,32 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
+	 * Apply a function to the attribute of each edge in the graph.
+	 *
+	 * @param mapper the map function to apply.
+	 * @param returnType the explicit return type.
+	 * @return a new graph
+	 */
+	public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper, TypeInformation<Edge<K,NV>> returnType) {
+		DataSet<Edge<K, NV>> mappedEdges = edges.map(
+			new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
+				private Edge<K, NV> output = new Edge<>();
+
+				public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
+					output.f0 = value.f0;
+					output.f1 = value.f1;
+					output.f2 = mapper.map(value);
+					return output;
+				}
+			})
+			.returns(returnType)
+			.withForwardedFields("f0; f1")
+				.name("Map edges");
+
+		return new Graph<>(this.vertices, mappedEdges, this.context);
+	}
+
+	/**
 	 * Translate {@link Vertex} and {@link Edge} IDs using the given {@link MapFunction}.
 	 *
 	 * @param translator implements conversion from {@code K} to {@code NEW}
@@ -587,26 +644,6 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Apply a function to the attribute of each edge in the graph.
-	 *
-	 * @param mapper the map function to apply.
-	 * @param returnType the explicit return type.
-	 * @return a new graph
-	 */
-	public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper, TypeInformation<Edge<K,NV>> returnType) {
-		DataSet<Edge<K, NV>> mappedEdges = edges.map(
-				new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
-					public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
-						return new Edge<>(value.f0, value.f1, mapper.map(value));
-					}
-				})
-				.returns(returnType)
-				.withForwardedFields("f0; f1");
-
-		return new Graph<>(this.vertices, mappedEdges, this.context);
-	}
-
-	/**
 	 * Joins the vertex DataSet of this graph with an input Tuple2 DataSet and applies
 	 * a user-defined transformation on the values of the matched records.
 	 * The vertex ID and the first field of the Tuple2 DataSet are used as the join keys.
@@ -627,7 +664,8 @@ public class Graph<K, VV, EV> {
 
 		DataSet<Vertex<K, VV>> resultedVertices = this.getVertices()
 				.coGroup(inputDataSet).where(0).equalTo(0)
-				.with(new ApplyCoGroupToVertexValues<K, VV, T>(vertexJoinFunction));
+				.with(new ApplyCoGroupToVertexValues<K, VV, T>(vertexJoinFunction))
+					.name("Join with vertices");
 		return new Graph<>(resultedVertices, this.edges, this.context);
 	}
 
@@ -680,12 +718,14 @@ public class Graph<K, VV, EV> {
 
 		DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
 				.coGroup(inputDataSet).where(0, 1).equalTo(0, 1)
-				.with(new ApplyCoGroupToEdgeValues<K, EV, T>(edgeJoinFunction));
+				.with(new ApplyCoGroupToEdgeValues<K, EV, T>(edgeJoinFunction))
+					.name("Join with edges");
 		return new Graph<>(this.vertices, resultedEdges, this.context);
 	}
 
 	private static final class ApplyCoGroupToEdgeValues<K, EV, T>
-			implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> {
+	implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> {
+		private Edge<K, EV> output = new Edge<>();
 
 		private EdgeJoinFunction<EV, T> edgeJoinFunction;
 
@@ -704,9 +744,10 @@ public class Graph<K, VV, EV> {
 				if (inputIterator.hasNext()) {
 					final Tuple3<K, K, T> inputNext = inputIterator.next();
 
-					collector.collect(new Edge<>(inputNext.f0,
-							inputNext.f1, edgeJoinFunction.edgeJoin(
-									edgesIterator.next().f2, inputNext.f2)));
+					output.f0 = inputNext.f0;
+					output.f1 = inputNext.f1;
+					output.f2 = edgeJoinFunction.edgeJoin(edgesIterator.next().f2, inputNext.f2);
+					collector.collect(output);
 				} else {
 					collector.collect(edgesIterator.next());
 				}
@@ -734,13 +775,15 @@ public class Graph<K, VV, EV> {
 
 		DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
 				.coGroup(inputDataSet).where(0).equalTo(0)
-				.with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction));
+				.with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction))
+					.name("Join with edges on source");
 
 		return new Graph<>(this.vertices, resultedEdges, this.context);
 	}
 
 	private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>
-			implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> {
+	implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> {
+		private Edge<K, EV> output = new Edge<>();
 
 		private EdgeJoinFunction<EV, T> edgeJoinFunction;
 
@@ -749,8 +792,8 @@ public class Graph<K, VV, EV> {
 		}
 
 		@Override
-		public void coGroup(Iterable<Edge<K, EV>> edges,
-				Iterable<Tuple2<K, T>> input, Collector<Edge<K, EV>> collector) throws Exception {
+		public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Tuple2<K, T>> input,
+				Collector<Edge<K, EV>> collector) throws Exception {
 
 			final Iterator<Edge<K, EV>> edgesIterator = edges.iterator();
 			final Iterator<Tuple2<K, T>> inputIterator = input.iterator();
@@ -761,8 +804,10 @@ public class Graph<K, VV, EV> {
 				while (edgesIterator.hasNext()) {
 					Edge<K, EV> edgesNext = edgesIterator.next();
 
-					collector.collect(new Edge<>(edgesNext.f0,
-							edgesNext.f1, edgeJoinFunction.edgeJoin(edgesNext.f2, inputNext.f1)));
+					output.f0 = edgesNext.f0;
+					output.f1 = edgesNext.f1;
+					output.f2 = edgeJoinFunction.edgeJoin(edgesNext.f2, inputNext.f1);
+					collector.collect(output);
 				}
 
 			} else {
@@ -793,7 +838,8 @@ public class Graph<K, VV, EV> {
 
 		DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
 				.coGroup(inputDataSet).where(1).equalTo(0)
-				.with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction));
+				.with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction))
+					.name("Join with edges on target");
 
 		return new Graph<>(this.vertices, resultedEdges, this.context);
 	}
@@ -813,7 +859,7 @@ public class Graph<K, VV, EV> {
 		DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices)
 				.where(0).equalTo(0).with(new ProjectEdge<K, VV, EV>())
 				.join(filteredVertices).where(1).equalTo(0)
-				.with(new ProjectEdge<K, VV, EV>());
+				.with(new ProjectEdge<K, VV, EV>()).name("Subgraph");
 
 		DataSet<Edge<K, EV>> filteredEdges = remainingEdges.filter(edgeFilter);
 
@@ -834,7 +880,7 @@ public class Graph<K, VV, EV> {
 		DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices)
 				.where(0).equalTo(0).with(new ProjectEdge<K, VV, EV>())
 				.join(filteredVertices).where(1).equalTo(0)
-				.with(new ProjectEdge<K, VV, EV>());
+				.with(new ProjectEdge<K, VV, EV>()).name("Filter on vertices");
 
 		return new Graph<>(filteredVertices, remainingEdges, this.context);
 	}
@@ -847,7 +893,7 @@ public class Graph<K, VV, EV> {
 	 * @return the resulting sub-graph.
 	 */
 	public Graph<K, VV, EV> filterOnEdges(FilterFunction<Edge<K, EV>> edgeFilter) {
-		DataSet<Edge<K, EV>> filteredEdges = this.edges.filter(edgeFilter);
+		DataSet<Edge<K, EV>> filteredEdges = this.edges.filter(edgeFilter).name("Filter on edges");
 
 		return new Graph<>(this.vertices, filteredEdges, this.context);
 	}
@@ -867,7 +913,8 @@ public class Graph<K, VV, EV> {
 	 */
 	public DataSet<Tuple2<K, LongValue>> outDegrees() {
 
-		return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup<K, VV, EV>());
+		return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup<K, VV, EV>())
+			.name("Out-degree");
 	}
 
 	private static final class CountNeighborsCoGroup<K, VV, EV>
@@ -903,7 +950,8 @@ public class Graph<K, VV, EV> {
 	 */
 	public DataSet<Tuple2<K, LongValue>> inDegrees() {
 
-		return vertices.coGroup(edges).where(0).equalTo(1).with(new CountNeighborsCoGroup<K, VV, EV>());
+		return vertices.coGroup(edges).where(0).equalTo(1).with(new CountNeighborsCoGroup<K, VV, EV>())
+			.name("In-degree");
 	}
 
 	/**
@@ -912,7 +960,9 @@ public class Graph<K, VV, EV> {
 	 * @return A DataSet of {@code Tuple2<vertexId, degree>}
 	 */
 	public DataSet<Tuple2<K, LongValue>> getDegrees() {
-		return outDegrees().union(inDegrees()).groupBy(0).sum(1);
+		return outDegrees()
+			.union(inDegrees()).name("In- and out-degree")
+			.groupBy(0).sum(1).name("Sum");
 	}
 
 	/**
@@ -922,7 +972,8 @@ public class Graph<K, VV, EV> {
 	 */
 	public Graph<K, VV, EV> getUndirected() {
 
-		DataSet<Edge<K, EV>> undirectedEdges = edges.flatMap(new RegularAndReversedEdgesMap<K, EV>());
+		DataSet<Edge<K, EV>> undirectedEdges = edges.
+			flatMap(new RegularAndReversedEdgesMap<K, EV>()).name("To undirected graph");
 		return new Graph<>(vertices, undirectedEdges, this.context);
 	}
 
@@ -946,13 +997,15 @@ public class Graph<K, VV, EV> {
 		switch (direction) {
 		case IN:
 			return vertices.coGroup(edges).where(0).equalTo(1)
-					.with(new ApplyCoGroupFunction<>(edgesFunction));
+					.with(new ApplyCoGroupFunction<>(edgesFunction)).name("GroupReduce on in-edges");
 		case OUT:
 			return vertices.coGroup(edges).where(0).equalTo(0)
-					.with(new ApplyCoGroupFunction<>(edgesFunction));
+					.with(new ApplyCoGroupFunction<>(edgesFunction)).name("GroupReduce on out-edges");
 		case ALL:
-			return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()))
-					.where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction));
+			return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
+						.name("Emit edge"))
+					.where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction))
+						.name("GroupReduce on in- and out-edges");
 		default:
 			throw new IllegalArgumentException("Illegal edge direction");
 		}
@@ -979,13 +1032,17 @@ public class Graph<K, VV, EV> {
 		switch (direction) {
 			case IN:
 				return vertices.coGroup(edges).where(0).equalTo(1)
-						.with(new ApplyCoGroupFunction<>(edgesFunction)).returns(typeInfo);
+						.with(new ApplyCoGroupFunction<>(edgesFunction))
+							.name("GroupReduce on in-edges").returns(typeInfo);
 			case OUT:
 				return vertices.coGroup(edges).where(0).equalTo(0)
-						.with(new ApplyCoGroupFunction<>(edgesFunction)).returns(typeInfo);
+						.with(new ApplyCoGroupFunction<>(edgesFunction))
+							.name("GroupReduce on out-edges").returns(typeInfo);
 			case ALL:
-				return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()))
-						.where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction)).returns(typeInfo);
+				return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
+							.name("Emit edge"))
+						.where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction))
+							.name("GroupReduce on in- and out-edges").returns(typeInfo);
 			default:
 				throw new IllegalArgumentException("Illegal edge direction");
 		}
@@ -1011,15 +1068,18 @@ public class Graph<K, VV, EV> {
 		switch (direction) {
 		case IN:
 			return edges.map(new ProjectVertexIdMap<K, EV>(1))
-					.withForwardedFields("f1->f0")
-					.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction));
+					.withForwardedFields("f1->f0").name("Vertex ID")
+					.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
+						.name("GroupReduce on in-edges");
 		case OUT:
 			return edges.map(new ProjectVertexIdMap<K, EV>(0))
-					.withForwardedFields("f0")
-					.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction));
+					.withForwardedFields("f0").name("Vertex ID")
+					.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
+						.name("GroupReduce on out-edges");
 		case ALL:
-			return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
-					.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction));
+			return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()).name("Emit edge")
+				.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
+					.name("GroupReduce on in- and out-edges");
 		default:
 			throw new IllegalArgumentException("Illegal edge direction");
 		}
@@ -1045,16 +1105,19 @@ public class Graph<K, VV, EV> {
 
 		switch (direction) {
 			case IN:
-				return edges.map(new ProjectVertexIdMap<K, EV>(1))
+				return edges.map(new ProjectVertexIdMap<K, EV>(1)).name("Vertex ID")
 						.withForwardedFields("f1->f0")
-						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo);
+						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
+							.name("GroupReduce on in-edges").returns(typeInfo);
 			case OUT:
-				return edges.map(new ProjectVertexIdMap<K, EV>(0))
+				return edges.map(new ProjectVertexIdMap<K, EV>(0)).name("Vertex ID")
 						.withForwardedFields("f0")
-						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo);
+						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
+							.name("GroupReduce on out-edges").returns(typeInfo);
 			case ALL:
-				return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
-						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo);
+				return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()).name("Emit edge")
+						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
+							.name("GroupReduce on in- and out-edges").returns(typeInfo);
 			default:
 				throw new IllegalArgumentException("Illegal edge direction");
 		}
@@ -1220,20 +1283,25 @@ public class Graph<K, VV, EV> {
 
 	@ForwardedFields("f0->f1; f1->f0; f2")
 	private static final class ReverseEdgesMap<K, EV>
-			implements MapFunction<Edge<K, EV>, Edge<K, EV>> {
+	implements MapFunction<Edge<K, EV>, Edge<K, EV>> {
+		public Edge<K, EV> output = new Edge<>();
 
-		public Edge<K, EV> map(Edge<K, EV> value) {
-			return new Edge<>(value.f1, value.f0, value.f2);
+		public Edge<K, EV> map(Edge<K, EV> edge) {
+			output.setFields(edge.f1, edge.f0, edge.f2);
+			return output;
 		}
 	}
 
 	private static final class RegularAndReversedEdgesMap<K, EV>
-			implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> {
+	implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> {
+		public Edge<K, EV> output = new Edge<>();
 
 		@Override
 		public void flatMap(Edge<K, EV> edge, Collector<Edge<K, EV>> out) throws Exception {
-			out.collect(new Edge<>(edge.f0, edge.f1, edge.f2));
-			out.collect(new Edge<>(edge.f1, edge.f0, edge.f2));
+			out.collect(edge);
+
+			output.setFields(edge.f1, edge.f0, edge.f2);
+			out.collect(output);
 		}
 	}
 
@@ -1244,7 +1312,7 @@ public class Graph<K, VV, EV> {
 	 * @throws UnsupportedOperationException
 	 */
 	public Graph<K, VV, EV> reverse() throws UnsupportedOperationException {
-		DataSet<Edge<K, EV>> reversedEdges = edges.map(new ReverseEdgesMap<K, EV>());
+		DataSet<Edge<K, EV>> reversedEdges = edges.map(new ReverseEdgesMap<K, EV>()).name("Reverse edges");
 		return new Graph<>(vertices, reversedEdges, this.context);
 	}
 
@@ -1266,7 +1334,7 @@ public class Graph<K, VV, EV> {
 	 * @return The IDs of the vertices as DataSet
 	 */
 	public DataSet<K> getVertexIds() {
-		return vertices.map(new ExtractVertexIDMapper<K, VV>());
+		return vertices.map(new ExtractVertexIDMapper<K, VV>()).name("Vertex IDs");
 	}
 
 	private static final class ExtractVertexIDMapper<K, VV>
@@ -1281,7 +1349,7 @@ public class Graph<K, VV, EV> {
 	 * @return The IDs of the edges as DataSet
 	 */
 	public DataSet<Tuple2<K, K>> getEdgeIds() {
-		return edges.map(new ExtractEdgeIDsMapper<K, EV>());
+		return edges.map(new ExtractEdgeIDsMapper<K, EV>()).name("Edge IDs");
 	}
 
 	@ForwardedFields("f0; f1")
@@ -1317,7 +1385,7 @@ public class Graph<K, VV, EV> {
 	public Graph<K, VV, EV> addVertices(List<Vertex<K, VV>> verticesToAdd) {
 		// Add the vertices
 		DataSet<Vertex<K, VV>> newVertices = this.vertices.coGroup(this.context.fromCollection(verticesToAdd))
-				.where(0).equalTo(0).with(new VerticesUnionCoGroup<K, VV>());
+				.where(0).equalTo(0).with(new VerticesUnionCoGroup<K, VV>()).name("Add vertices");
 
 		return new Graph<>(newVertices, this.edges, this.context);
 	}
@@ -1371,9 +1439,9 @@ public class Graph<K, VV, EV> {
 
 		DataSet<Edge<K,EV>> validNewEdges = this.getVertices().join(newEdgesDataSet)
 				.where(0).equalTo(0)
-				.with(new JoinVerticesWithEdgesOnSrc<K, VV, EV>())
+				.with(new JoinVerticesWithEdgesOnSrc<K, VV, EV>()).name("Join with source")
 				.join(this.getVertices()).where(1).equalTo(0)
-				.with(new JoinWithVerticesOnTrg<K, VV, EV>());
+				.with(new JoinWithVerticesOnTrg<K, VV, EV>()).name("Join with target");
 
 		return Graph.fromDataSet(this.vertices, this.edges.union(validNewEdges), this.context);
 	}
@@ -1435,14 +1503,14 @@ public class Graph<K, VV, EV> {
 	private Graph<K, VV, EV> removeVertices(DataSet<Vertex<K, VV>> verticesToBeRemoved) {
 
 		DataSet<Vertex<K, VV>> newVertices = getVertices().coGroup(verticesToBeRemoved).where(0).equalTo(0)
-				.with(new VerticesRemovalCoGroup<K, VV>());
+				.with(new VerticesRemovalCoGroup<K, VV>()).name("Remove vertices");
 
 		DataSet <Edge< K, EV>> newEdges = newVertices.join(getEdges()).where(0).equalTo(0)
 				// if the edge source was removed, the edge will also be removed
-				.with(new ProjectEdgeToBeRemoved<K, VV, EV>())
+				.with(new ProjectEdgeToBeRemoved<K, VV, EV>()).name("Edges to be removed")
 				// if the edge target was removed, the edge will also be removed
 				.join(newVertices).where(1).equalTo(0)
-				.with(new ProjectEdge<K, VV, EV>());
+				.with(new ProjectEdge<K, VV, EV>()).name("Remove edges");
 
 		return new Graph<>(newVertices, newEdges, context);
 	}
@@ -1466,8 +1534,6 @@ public class Graph<K, VV, EV> {
 		}
 	}
 
-
-
 	@ForwardedFieldsSecond("f0; f1; f2")
 	private static final class ProjectEdgeToBeRemoved<K,VV,EV> implements JoinFunction<Vertex<K, VV>, Edge<K, EV>, Edge<K, EV>> {
 		@Override
@@ -1484,7 +1550,7 @@ public class Graph<K, VV, EV> {
 	 *         the removed edges
 	 */
 	public Graph<K, VV, EV> removeEdge(Edge<K, EV> edge) {
-		DataSet<Edge<K, EV>> newEdges = getEdges().filter(new EdgeRemovalEdgeFilter<>(edge));
+		DataSet<Edge<K, EV>> newEdges = getEdges().filter(new EdgeRemovalEdgeFilter<>(edge)).name("Remove edge");
 		return new Graph<>(this.vertices, newEdges, this.context);
 	}
 
@@ -1512,7 +1578,7 @@ public class Graph<K, VV, EV> {
 	public Graph<K, VV, EV> removeEdges(List<Edge<K, EV>> edgesToBeRemoved) {
 
 		DataSet<Edge<K, EV>> newEdges = getEdges().coGroup(this.context.fromCollection(edgesToBeRemoved))
-				.where(0,1).equalTo(0,1).with(new EdgeRemovalCoGroup<K, EV>());
+				.where(0,1).equalTo(0,1).with(new EdgeRemovalCoGroup<K, EV>()).name("Remove edges");
 
 		return new Graph<>(this.vertices, newEdges, context);
 	}
@@ -1538,8 +1604,18 @@ public class Graph<K, VV, EV> {
 	 * @return a new graph
 	 */
 	public Graph<K, VV, EV> union(Graph<K, VV, EV> graph) {
-		DataSet<Vertex<K, VV>> unionedVertices = graph.getVertices().union(this.getVertices()).distinct();
-		DataSet<Edge<K, EV>> unionedEdges = graph.getEdges().union(this.getEdges());
+		DataSet<Vertex<K, VV>> unionedVertices = graph
+			.getVertices()
+			.union(this.getVertices())
+				.name("Vertices")
+			.distinct()
+				.name("Vertices");
+
+		DataSet<Edge<K, EV>> unionedEdges = graph
+			.getEdges()
+			.union(this.getEdges())
+				.name("Edges");
+
 		return new Graph<>(unionedVertices, unionedEdges, this.context);
 	}
 
@@ -1603,8 +1679,9 @@ public class Graph<K, VV, EV> {
 					public Edge<K, EV> join(Edge<K, EV> first, Edge<K, EV> second) throws Exception {
 						return first;
 					}
-				}).withForwardedFieldsFirst("*")
-				.distinct();
+				}).withForwardedFieldsFirst("*").name("Intersect edges")
+				.distinct()
+					.name("Edges");
 	}
 
 	/**
@@ -1619,7 +1696,8 @@ public class Graph<K, VV, EV> {
 				.coGroup(edges)
 				.where(0, 1, 2)
 				.equalTo(0, 1, 2)
-				.with(new MatchingEdgeReducer<K, EV>());
+				.with(new MatchingEdgeReducer<K, EV>())
+					.name("Intersect edges");
 	}
 
 	/**
@@ -1827,27 +1905,27 @@ public class Graph<K, VV, EV> {
 		case IN:
 			// create <edge-sourceVertex> pairs
 			DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
-					.join(this.vertices).where(0).equalTo(0);
+					.join(this.vertices).where(0).equalTo(0).name("Edge with source vertex");
 			return vertices.coGroup(edgesWithSources)
 					.where(0).equalTo("f0.f1")
-					.with(new ApplyNeighborCoGroupFunction<>(neighborsFunction));
+					.with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)).name("Neighbors function");
 		case OUT:
 			// create <edge-targetVertex> pairs
 			DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
-					.join(this.vertices).where(1).equalTo(0);
+					.join(this.vertices).where(1).equalTo(0).name("Edge with target vertex");
 			return vertices.coGroup(edgesWithTargets)
 					.where(0).equalTo("f0.f0")
-					.with(new ApplyNeighborCoGroupFunction<>(neighborsFunction));
+					.with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)).name("Neighbors function");
 		case ALL:
 			// create <edge-sourceOrTargetVertex> pairs
 			DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
-					.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
+					.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>()).name("Forward and reverse edges")
 					.join(this.vertices).where(1).equalTo(0)
-					.with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+					.with(new ProjectEdgeWithNeighbor<K, VV, EV>()).name("Edge with vertex");
 
 			return vertices.coGroup(edgesWithNeighbors)
 					.where(0).equalTo(0)
-					.with(new ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction));
+					.with(new ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction)).name("Neighbors function");
 		default:
 			throw new IllegalArgumentException("Illegal edge direction");
 		}
@@ -1875,27 +1953,30 @@ public class Graph<K, VV, EV> {
 			case IN:
 				// create <edge-sourceVertex> pairs
 				DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
-						.join(this.vertices).where(0).equalTo(0);
+						.join(this.vertices).where(0).equalTo(0).name("Edge with source vertex");
 				return vertices.coGroup(edgesWithSources)
 						.where(0).equalTo("f0.f1")
-						.with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)).returns(typeInfo);
+						.with(new ApplyNeighborCoGroupFunction<>(neighborsFunction))
+							.name("Neighbors function").returns(typeInfo);
 			case OUT:
 				// create <edge-targetVertex> pairs
 				DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
-						.join(this.vertices).where(1).equalTo(0);
+						.join(this.vertices).where(1).equalTo(0).name("Edge with target vertex");
 				return vertices.coGroup(edgesWithTargets)
 						.where(0).equalTo("f0.f0")
-						.with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)).returns(typeInfo);
+						.with(new ApplyNeighborCoGroupFunction<>(neighborsFunction))
+							.name("Neighbors function").returns(typeInfo);
 			case ALL:
 				// create <edge-sourceOrTargetVertex> pairs
 				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
-						.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
+						.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>()).name("Forward and reverse edges")
 						.join(this.vertices).where(1).equalTo(0)
-						.with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+						.with(new ProjectEdgeWithNeighbor<K, VV, EV>()).name("Edge with vertex");
 
 				return vertices.coGroup(edgesWithNeighbors)
 						.where(0).equalTo(0)
-						.with(new ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction)).returns(typeInfo);
+						.with(new ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction))
+							.name("Neighbors function").returns(typeInfo);
 			default:
 				throw new IllegalArgumentException("Illegal edge direction");
 		}
@@ -1924,26 +2005,26 @@ public class Graph<K, VV, EV> {
 			DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
 					.join(this.vertices).where(0).equalTo(0)
 					.with(new ProjectVertexIdJoin<K, VV, EV>(1))
-					.withForwardedFieldsFirst("f1->f0");
+					.withForwardedFieldsFirst("f1->f0").name("Edge with source vertex ID");
 			return edgesWithSources.groupBy(0).reduceGroup(
-				new ApplyNeighborGroupReduceFunction<>(neighborsFunction));
+				new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).name("Neighbors function");
 		case OUT:
 			// create <edge-targetVertex> pairs
 			DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
 					.join(this.vertices).where(1).equalTo(0)
 					.with(new ProjectVertexIdJoin<K, VV, EV>(0))
-					.withForwardedFieldsFirst("f0");
+					.withForwardedFieldsFirst("f0").name("Edge with target vertex ID");
 			return edgesWithTargets.groupBy(0).reduceGroup(
-				new ApplyNeighborGroupReduceFunction<>(neighborsFunction));
+				new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).name("Neighbors function");
 		case ALL:
 			// create <edge-sourceOrTargetVertex> pairs
 			DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
-					.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
+					.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>()).name("Forward and reverse edges")
 					.join(this.vertices).where(1).equalTo(0)
-					.with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+					.with(new ProjectEdgeWithNeighbor<K, VV, EV>()).name("Edge with vertex ID");
 
 			return edgesWithNeighbors.groupBy(0).reduceGroup(
-				new ApplyNeighborGroupReduceFunction<>(neighborsFunction));
+				new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).name("Neighbors function");
 		default:
 			throw new IllegalArgumentException("Illegal edge direction");
 		}
@@ -1973,26 +2054,29 @@ public class Graph<K, VV, EV> {
 				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
 						.join(this.vertices).where(0).equalTo(0)
 						.with(new ProjectVertexIdJoin<K, VV, EV>(1))
-						.withForwardedFieldsFirst("f1->f0");
+						.withForwardedFieldsFirst("f1->f0").name("Edge with source vertex ID");
 				return edgesWithSources.groupBy(0).reduceGroup(
-					new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo);
+					new ApplyNeighborGroupReduceFunction<>(neighborsFunction))
+						.name("Neighbors function").returns(typeInfo);
 			case OUT:
 				// create <edge-targetVertex> pairs
 				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
 						.join(this.vertices).where(1).equalTo(0)
 						.with(new ProjectVertexIdJoin<K, VV, EV>(0))
-						.withForwardedFieldsFirst("f0");
+						.withForwardedFieldsFirst("f0").name("Edge with target vertex ID");
 				return edgesWithTargets.groupBy(0).reduceGroup(
-					new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo);
+					new ApplyNeighborGroupReduceFunction<>(neighborsFunction))
+						.name("Neighbors function").returns(typeInfo);
 			case ALL:
 				// create <edge-sourceOrTargetVertex> pairs
 				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
 						.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
 						.join(this.vertices).where(1).equalTo(0)
-						.with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+						.with(new ProjectEdgeWithNeighbor<K, VV, EV>()).name("Edge with vertex ID");
 
 				return edgesWithNeighbors.groupBy(0).reduceGroup(
-					new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo);
+					new ApplyNeighborGroupReduceFunction<>(neighborsFunction))
+						.name("Neighbors function").returns(typeInfo);
 			default:
 				throw new IllegalArgumentException("Illegal edge direction");
 		}
@@ -2170,26 +2254,26 @@ public class Graph<K, VV, EV> {
 				final DataSet<Tuple2<K, VV>> verticesWithSourceNeighborValues = edges
 						.join(this.vertices).where(0).equalTo(0)
 						.with(new ProjectVertexWithNeighborValueJoin<K, VV, EV>(1))
-						.withForwardedFieldsFirst("f1->f0");
+						.withForwardedFieldsFirst("f1->f0").name("Vertex with in-neighbor value");
 				return verticesWithSourceNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(
-						reduceNeighborsFunction));
+						reduceNeighborsFunction)).name("Neighbors function");
 			case OUT:
 				// create <vertex-target value> pairs
 				DataSet<Tuple2<K, VV>> verticesWithTargetNeighborValues = edges
 						.join(this.vertices).where(1).equalTo(0)
 						.with(new ProjectVertexWithNeighborValueJoin<K, VV, EV>(0))
-						.withForwardedFieldsFirst("f0");
+						.withForwardedFieldsFirst("f0").name("Vertex with out-neighbor value");
 				return verticesWithTargetNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(
-						reduceNeighborsFunction));
+						reduceNeighborsFunction)).name("Neighbors function");
 			case ALL:
 				// create <vertex-neighbor value> pairs
 				DataSet<Tuple2<K, VV>> verticesWithNeighborValues = edges
 						.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
 						.join(this.vertices).where(1).equalTo(0)
-						.with(new ProjectNeighborValue<K, VV, EV>());
+						.with(new ProjectNeighborValue<K, VV, EV>()).name("Vertex with neighbor value");
 
 				return verticesWithNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(
-						reduceNeighborsFunction));
+						reduceNeighborsFunction)).name("Neighbors function");
 			default:
 				throw new IllegalArgumentException("Illegal edge direction");
 		}
@@ -2231,15 +2315,21 @@ public class Graph<K, VV, EV> {
 			case IN:
 				return edges.map(new ProjectVertexWithEdgeValueMap<K, EV>(1))
 						.withForwardedFields("f1->f0")
-						.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
+							.name("Vertex with in-edges")
+						.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
+							.name("Reduce on edges");
 			case OUT:
 				return edges.map(new ProjectVertexWithEdgeValueMap<K, EV>(0))
 						.withForwardedFields("f0->f0")
-						.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
+							.name("Vertex with out-edges")
+						.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
+							.name("Reduce on edges");
 			case ALL:
 				return edges.flatMap(new EmitOneVertexWithEdgeValuePerNode<K, EV>())
 						.withForwardedFields("f2->f1")
-						.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
+							.name("Vertex with all edges")
+						.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
+							.name("Reduce on edges");
 			default:
 				throw new IllegalArgumentException("Illegal edge direction");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/09e08173/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
index 4859e12..6547f9a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
@@ -20,12 +20,14 @@ package org.apache.flink.graph;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.CsvReader;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.graph.utils.Tuple2ToEdgeMap;
+import org.apache.flink.graph.utils.Tuple2ToVertexMap;
 import org.apache.flink.types.NullValue;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -44,9 +46,6 @@ public class GraphCsvReader {
 	protected CsvReader edgeReader;
 	protected CsvReader vertexReader;
 	protected MapFunction<?, ?> mapper;
-	protected Class<?> vertexKey;
-	protected Class<?> vertexValue;
-	protected Class<?> edgeValue;
 
 //--------------------------------------------------------------------------------------------------------------------
 	public GraphCsvReader(Path vertexPath, Path edgePath, ExecutionEnvironment context) {
@@ -104,17 +103,18 @@ public class GraphCsvReader {
 	public <K, VV, EV> Graph<K, VV, EV> types(Class<K> vertexKey, Class<VV> vertexValue,
 			Class<EV> edgeValue) {
 
-		DataSet<Tuple2<K, VV>> vertices;
-
 		if (edgeReader == null) {
-			throw new RuntimeException("The edges input file cannot be null!");
+			throw new RuntimeException("The edge input file cannot be null!");
 		}
 
 		DataSet<Tuple3<K, K, EV>> edges = edgeReader.types(vertexKey, vertexKey, edgeValue);
 
 		// the vertex value can be provided by an input file or a user-defined mapper
 		if (vertexReader != null) {
-			vertices = vertexReader.types(vertexKey, vertexValue);
+			DataSet<Tuple2<K, VV>> vertices = vertexReader
+				.types(vertexKey, vertexValue)
+					.name(GraphCsvReader.class.getName());
+
 			return Graph.fromTupleDataSet(vertices, edges, executionContext);
 		}
 		else if (mapper != null) {
@@ -135,10 +135,12 @@ public class GraphCsvReader {
 	public <K, EV> Graph<K, NullValue, EV> edgeTypes(Class<K> vertexKey, Class<EV> edgeValue) {
 
 		if (edgeReader == null) {
-			throw new RuntimeException("The edges input file cannot be null!");
+			throw new RuntimeException("The edge input file cannot be null!");
 		}
 
-		DataSet<Tuple3<K, K, EV>> edges = edgeReader.types(vertexKey, vertexKey, edgeValue);
+		DataSet<Tuple3<K, K, EV>> edges = edgeReader
+			.types(vertexKey, vertexKey, edgeValue)
+				.name(GraphCsvReader.class.getName());
 
 		return Graph.fromTupleDataSet(edges, executionContext);
 	}
@@ -151,20 +153,16 @@ public class GraphCsvReader {
 	public <K> Graph<K, NullValue, NullValue> keyType(Class<K> vertexKey) {
 
 		if (edgeReader == null) {
-			throw new RuntimeException("The edges input file cannot be null!");
+			throw new RuntimeException("The edge input file cannot be null!");
 		}
 
-		DataSet<Tuple3<K, K, NullValue>> edges = edgeReader.types(vertexKey, vertexKey)
-				.map(new MapFunction<Tuple2<K, K>, Tuple3<K, K, NullValue>>() {
-
-					private static final long serialVersionUID = -2981792951286476970L;
+		DataSet<Edge<K, NullValue>> edges = edgeReader
+			.types(vertexKey, vertexKey)
+				.name(GraphCsvReader.class.getName())
+			.map(new Tuple2ToEdgeMap<K>())
+				.name("Type conversion");
 
-					public Tuple3<K, K, NullValue> map(Tuple2<K, K> edge) {
-						return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
-					}
-				}).withForwardedFields("f0;f1");
-
-		return Graph.fromTupleDataSet(edges, executionContext);
+		return Graph.fromDataSet(edges, executionContext);
 	}
 
 	/**
@@ -178,28 +176,29 @@ public class GraphCsvReader {
 	 */
 	@SuppressWarnings({ "serial", "unchecked" })
 	public <K, VV> Graph<K, VV, NullValue> vertexTypes(Class<K> vertexKey, Class<VV> vertexValue) {
-		
-		DataSet<Tuple2<K, VV>> vertices;
 
 		if (edgeReader == null) {
-			throw new RuntimeException("The edges input file cannot be null!");
+			throw new RuntimeException("The edge input file cannot be null!");
 		}
 
-		DataSet<Tuple3<K, K, NullValue>> edges = edgeReader.types(vertexKey, vertexKey)
-				.map(new MapFunction<Tuple2<K,K>, Tuple3<K, K, NullValue>>() {
-
-					public Tuple3<K, K, NullValue> map(Tuple2<K, K> input) {
-						return new Tuple3<>(input.f0, input.f1, NullValue.getInstance());
-					}
-				}).withForwardedFields("f0;f1");
+		DataSet<Edge<K, NullValue>> edges = edgeReader
+			.types(vertexKey, vertexKey)
+				.name(GraphCsvReader.class.getName())
+			.map(new Tuple2ToEdgeMap<K>())
+				.name("To Edge");
 
 		// the vertex value can be provided by an input file or a user-defined mapper
 		if (vertexReader != null) {
-			vertices = vertexReader.types(vertexKey, vertexValue);
-			return Graph.fromTupleDataSet(vertices, edges, executionContext);
+			DataSet<Vertex<K, VV>> vertices = vertexReader
+				.types(vertexKey, vertexValue)
+					.name(GraphCsvReader.class.getName())
+				.map(new Tuple2ToVertexMap<K, VV>())
+					.name("Type conversion");
+
+			return Graph.fromDataSet(vertices, edges, executionContext);
 		}
 		else if (mapper != null) {
-			return Graph.fromTupleDataSet(edges, (MapFunction<K, VV>) mapper, executionContext);
+			return Graph.fromDataSet(edges, (MapFunction<K, VV>) mapper, executionContext);
 		}
 		else {
 			throw new RuntimeException("Vertex values have to be specified through a vertices input file"

http://git-wip-us.apache.org/repos/asf/flink/blob/09e08173/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
index 2eee6d7..7f37356 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.graph.generator;
 
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
 public abstract class AbstractGraphGenerator<K, VV, EV>
 implements GraphGenerator<K, VV, EV> {
 
 	// Optional configuration
-	protected int parallelism = -1;
+	protected int parallelism = PARALLELISM_DEFAULT;
 
 	@Override
 	public GraphGenerator<K,VV,EV> setParallelism(int parallelism) {

http://git-wip-us.apache.org/repos/asf/flink/blob/09e08173/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToEdgeMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToEdgeMap.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToEdgeMap.java
new file mode 100644
index 0000000..5eb8287
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToEdgeMap.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.types.NullValue;
+
+/**
+ * Create an Edge from a Tuple2.
+ *
+ * The new edge's value is set to {@link NullValue}.
+ *
+ * @param <K> edge ID type
+ */
+@ForwardedFields("f0; f1")
+public class Tuple2ToEdgeMap<K> implements MapFunction<Tuple2<K, K>, Edge<K, NullValue>> {
+
+	private static final long serialVersionUID = 1L;
+
+	private Edge<K, NullValue> edge = new Edge<>(null, null, NullValue.getInstance());
+
+	@Override
+	public Edge<K, NullValue> map(Tuple2<K, K> tuple) {
+		edge.f0 = tuple.f0;
+		edge.f1 = tuple.f1;
+		return edge;
+	}
+
+}