You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/12/14 21:11:36 UTC
flink git commit: [FLINK-4936] [gelly] Operator names for Gelly inputs
Repository: flink
Updated Branches:
refs/heads/master 21d1d8b49 -> 09e081730
[FLINK-4936] [gelly] Operator names for Gelly inputs
Provide descriptive operator names for Graph and GraphCsvReader.
Condense multiple type conversion maps into a single mapper.
Reuse objects in operations wrapping user-defined-functions.
This closes #2832
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09e08173
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09e08173
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09e08173
Branch: refs/heads/master
Commit: 09e0817306e8c077aeea96252a7c98fbd0f9747b
Parents: 21d1d8b
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Oct 26 15:18:50 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Dec 14 15:03:45 2016 -0500
----------------------------------------------------------------------
.../main/java/org/apache/flink/graph/Graph.java | 408 +++++++++++--------
.../org/apache/flink/graph/GraphCsvReader.java | 67 ++-
.../graph/generator/AbstractGraphGenerator.java | 4 +-
.../flink/graph/utils/Tuple2ToEdgeMap.java | 48 +++
4 files changed, 333 insertions(+), 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/09e08173/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 7854c54..c6843e4 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -55,6 +55,7 @@ import org.apache.flink.graph.spargel.ScatterFunction;
import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
import org.apache.flink.graph.spargel.ScatterGatherIteration;
import org.apache.flink.graph.utils.EdgeToTuple3Map;
+import org.apache.flink.graph.utils.Tuple2ToEdgeMap;
import org.apache.flink.graph.utils.Tuple2ToVertexMap;
import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
import org.apache.flink.graph.utils.VertexToTuple2Map;
@@ -74,8 +75,7 @@ import java.util.NoSuchElementException;
/**
* Represents a Graph consisting of {@link Edge edges} and {@link Vertex
* vertices}.
- *
- *
+ *
* @see org.apache.flink.graph.Edge
* @see org.apache.flink.graph.Vertex
*
@@ -176,17 +176,24 @@ public class Graph<K, VV, EV> {
public static <K, EV> Graph<K, NullValue, EV> fromDataSet(
DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
- DataSet<Vertex<K, NullValue>> vertices = edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct();
+ DataSet<Vertex<K, NullValue>> vertices = edges
+ .flatMap(new EmitSrcAndTarget<K, EV>())
+ .name("Source and target IDs")
+ .distinct()
+ .name("IDs");
return new Graph<>(vertices, edges, context);
}
- private static final class EmitSrcAndTarget<K, EV> implements FlatMapFunction<
- Edge<K, EV>, Vertex<K, NullValue>> {
+ private static final class EmitSrcAndTarget<K, EV>
+ implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> {
+ private Vertex<K, NullValue> output = new Vertex<>(null, NullValue.getInstance());
public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, NullValue>> out) {
- out.collect(new Vertex<>(edge.f0, NullValue.getInstance()));
- out.collect(new Vertex<>(edge.f1, NullValue.getInstance()));
+ output.f0 = edge.f0;
+ out.collect(output);
+ output.f0 = edge.f1;
+ out.collect(output);
}
}
@@ -214,22 +221,32 @@ public class Graph<K, VV, EV> {
Vertex.class, keyType, valueType);
DataSet<Vertex<K, VV>> vertices = edges
- .flatMap(new EmitSrcAndTargetAsTuple1<K, EV>()).distinct()
- .map(new MapFunction<Tuple1<K>, Vertex<K, VV>>() {
- public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
- return new Vertex<>(value.f0, vertexValueInitializer.map(value.f0));
- }
- }).returns(returnType).withForwardedFields("f0");
+ .flatMap(new EmitSrcAndTargetAsTuple1<K, EV>())
+ .name("Source and target IDs")
+ .distinct()
+ .name("IDs")
+ .map(new MapFunction<Tuple1<K>, Vertex<K, VV>>() {
+ private Vertex<K, VV> output = new Vertex<>();
+
+ public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
+ output.f0 = value.f0;
+ output.f1 = vertexValueInitializer.map(value.f0);
+ return output;
+ }
+ }).returns(returnType).withForwardedFields("f0").name("Initialize vertex values");
return new Graph<>(vertices, edges, context);
}
- private static final class EmitSrcAndTargetAsTuple1<K, EV> implements FlatMapFunction<
- Edge<K, EV>, Tuple1<K>> {
+ private static final class EmitSrcAndTargetAsTuple1<K, EV>
+ implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
+ private Tuple1<K> output = new Tuple1<>();
public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
- out.collect(new Tuple1<>(edge.f0));
- out.collect(new Tuple1<>(edge.f1));
+ output.f0 = edge.f0;
+ out.collect(output);
+ output.f0 = edge.f1;
+ out.collect(output);
}
}
@@ -251,8 +268,14 @@ public class Graph<K, VV, EV> {
public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple2<K, VV>> vertices,
DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment context) {
- DataSet<Vertex<K, VV>> vertexDataSet = vertices.map(new Tuple2ToVertexMap<K, VV>());
- DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
+ DataSet<Vertex<K, VV>> vertexDataSet = vertices
+ .map(new Tuple2ToVertexMap<K, VV>())
+ .name("Type conversion");
+
+ DataSet<Edge<K, EV>> edgeDataSet = edges
+ .map(new Tuple3ToEdgeMap<K, EV>())
+ .name("Type conversion");
+
return fromDataSet(vertexDataSet, edgeDataSet, context);
}
@@ -272,7 +295,10 @@ public class Graph<K, VV, EV> {
public static <K, EV> Graph<K, NullValue, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges,
ExecutionEnvironment context) {
- DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
+ DataSet<Edge<K, EV>> edgeDataSet = edges
+ .map(new Tuple3ToEdgeMap<K, EV>())
+ .name("Type conversion");
+
return fromDataSet(edgeDataSet, context);
}
@@ -295,7 +321,10 @@ public class Graph<K, VV, EV> {
public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges,
final MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
- DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
+ DataSet<Edge<K, EV>> edgeDataSet = edges
+ .map(new Tuple3ToEdgeMap<K, EV>())
+ .name("Type conversion");
+
return fromDataSet(edgeDataSet, vertexValueInitializer, context);
}
@@ -313,13 +342,10 @@ public class Graph<K, VV, EV> {
public static <K> Graph<K, NullValue, NullValue> fromTuple2DataSet(DataSet<Tuple2<K, K>> edges,
ExecutionEnvironment context) {
- DataSet<Edge<K, NullValue>> edgeDataSet = edges.map(
- new MapFunction<Tuple2<K, K>, Edge<K, NullValue>>() {
+ DataSet<Edge<K, NullValue>> edgeDataSet = edges
+ .map(new Tuple2ToEdgeMap<K>())
+ .name("To Edge");
- public Edge<K, NullValue> map(Tuple2<K, K> input) {
- return new Edge<>(input.f0, input.f1, NullValue.getInstance());
- }
- }).withForwardedFields("f0; f1");
return fromDataSet(edgeDataSet, context);
}
@@ -341,13 +367,10 @@ public class Graph<K, VV, EV> {
public static <K, VV> Graph<K, VV, NullValue> fromTuple2DataSet(DataSet<Tuple2<K, K>> edges,
final MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
- DataSet<Edge<K, NullValue>> edgeDataSet = edges.map(
- new MapFunction<Tuple2<K, K>, Edge<K, NullValue>>() {
+ DataSet<Edge<K, NullValue>> edgeDataSet = edges
+ .map(new Tuple2ToEdgeMap<K>())
+ .name("To Edge");
- public Edge<K, NullValue> map(Tuple2<K, K> input) {
- return new Edge<>(input.f0, input.f1, NullValue.getInstance());
- }
- }).withForwardedFields("f0; f1");
return fromDataSet(edgeDataSet, vertexValueInitializer, context);
}
@@ -458,10 +481,13 @@ public class Graph<K, VV, EV> {
* @return a triplet DataSet consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue)
*/
public DataSet<Triplet<K, VV, EV>> getTriplets() {
- return this.getVertices().join(this.getEdges()).where(0).equalTo(0)
- .with(new ProjectEdgeWithSrcValue<K, VV, EV>())
- .join(this.getVertices()).where(1).equalTo(0)
- .with(new ProjectEdgeWithVertexValues<K, VV, EV>());
+ return this.getVertices()
+ .join(this.getEdges()).where(0).equalTo(0)
+ .with(new ProjectEdgeWithSrcValue<K, VV, EV>())
+ .name("Project edge with source value")
+ .join(this.getVertices()).where(1).equalTo(0)
+ .with(new ProjectEdgeWithVertexValues<K, VV, EV>())
+ .name("Project edge with vertex values");
}
@ForwardedFieldsFirst("f1->f2")
@@ -521,12 +547,17 @@ public class Graph<K, VV, EV> {
public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper, TypeInformation<Vertex<K,NV>> returnType) {
DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() {
+ private Vertex<K, NV> output = new Vertex<>();
+
public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
- return new Vertex<>(value.f0, mapper.map(value));
+ output.f0 = value.f0;
+ output.f1 = mapper.map(value);
+ return output;
}
})
.returns(returnType)
- .withForwardedFields("f0");
+ .withForwardedFields("f0")
+ .name("Map vertices");
return new Graph<>(mappedVertices, this.edges, this.context);
}
@@ -551,6 +582,32 @@ public class Graph<K, VV, EV> {
}
/**
+ * Apply a function to the attribute of each edge in the graph.
+ *
+ * @param mapper the map function to apply.
+ * @param returnType the explicit return type.
+ * @return a new graph
+ */
+ public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper, TypeInformation<Edge<K,NV>> returnType) {
+ DataSet<Edge<K, NV>> mappedEdges = edges.map(
+ new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
+ private Edge<K, NV> output = new Edge<>();
+
+ public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
+ output.f0 = value.f0;
+ output.f1 = value.f1;
+ output.f2 = mapper.map(value);
+ return output;
+ }
+ })
+ .returns(returnType)
+ .withForwardedFields("f0; f1")
+ .name("Map edges");
+
+ return new Graph<>(this.vertices, mappedEdges, this.context);
+ }
+
+ /**
* Translate {@link Vertex} and {@link Edge} IDs using the given {@link MapFunction}.
*
* @param translator implements conversion from {@code K} to {@code NEW}
@@ -587,26 +644,6 @@ public class Graph<K, VV, EV> {
}
/**
- * Apply a function to the attribute of each edge in the graph.
- *
- * @param mapper the map function to apply.
- * @param returnType the explicit return type.
- * @return a new graph
- */
- public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper, TypeInformation<Edge<K,NV>> returnType) {
- DataSet<Edge<K, NV>> mappedEdges = edges.map(
- new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
- public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
- return new Edge<>(value.f0, value.f1, mapper.map(value));
- }
- })
- .returns(returnType)
- .withForwardedFields("f0; f1");
-
- return new Graph<>(this.vertices, mappedEdges, this.context);
- }
-
- /**
* Joins the vertex DataSet of this graph with an input Tuple2 DataSet and applies
* a user-defined transformation on the values of the matched records.
* The vertex ID and the first field of the Tuple2 DataSet are used as the join keys.
@@ -627,7 +664,8 @@ public class Graph<K, VV, EV> {
DataSet<Vertex<K, VV>> resultedVertices = this.getVertices()
.coGroup(inputDataSet).where(0).equalTo(0)
- .with(new ApplyCoGroupToVertexValues<K, VV, T>(vertexJoinFunction));
+ .with(new ApplyCoGroupToVertexValues<K, VV, T>(vertexJoinFunction))
+ .name("Join with vertices");
return new Graph<>(resultedVertices, this.edges, this.context);
}
@@ -680,12 +718,14 @@ public class Graph<K, VV, EV> {
DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
.coGroup(inputDataSet).where(0, 1).equalTo(0, 1)
- .with(new ApplyCoGroupToEdgeValues<K, EV, T>(edgeJoinFunction));
+ .with(new ApplyCoGroupToEdgeValues<K, EV, T>(edgeJoinFunction))
+ .name("Join with edges");
return new Graph<>(this.vertices, resultedEdges, this.context);
}
private static final class ApplyCoGroupToEdgeValues<K, EV, T>
- implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> {
+ implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> {
+ private Edge<K, EV> output = new Edge<>();
private EdgeJoinFunction<EV, T> edgeJoinFunction;
@@ -704,9 +744,10 @@ public class Graph<K, VV, EV> {
if (inputIterator.hasNext()) {
final Tuple3<K, K, T> inputNext = inputIterator.next();
- collector.collect(new Edge<>(inputNext.f0,
- inputNext.f1, edgeJoinFunction.edgeJoin(
- edgesIterator.next().f2, inputNext.f2)));
+ output.f0 = inputNext.f0;
+ output.f1 = inputNext.f1;
+ output.f2 = edgeJoinFunction.edgeJoin(edgesIterator.next().f2, inputNext.f2);
+ collector.collect(output);
} else {
collector.collect(edgesIterator.next());
}
@@ -734,13 +775,15 @@ public class Graph<K, VV, EV> {
DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
.coGroup(inputDataSet).where(0).equalTo(0)
- .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction));
+ .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction))
+ .name("Join with edges on source");
return new Graph<>(this.vertices, resultedEdges, this.context);
}
private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>
- implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> {
+ implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> {
+ private Edge<K, EV> output = new Edge<>();
private EdgeJoinFunction<EV, T> edgeJoinFunction;
@@ -749,8 +792,8 @@ public class Graph<K, VV, EV> {
}
@Override
- public void coGroup(Iterable<Edge<K, EV>> edges,
- Iterable<Tuple2<K, T>> input, Collector<Edge<K, EV>> collector) throws Exception {
+ public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Tuple2<K, T>> input,
+ Collector<Edge<K, EV>> collector) throws Exception {
final Iterator<Edge<K, EV>> edgesIterator = edges.iterator();
final Iterator<Tuple2<K, T>> inputIterator = input.iterator();
@@ -761,8 +804,10 @@ public class Graph<K, VV, EV> {
while (edgesIterator.hasNext()) {
Edge<K, EV> edgesNext = edgesIterator.next();
- collector.collect(new Edge<>(edgesNext.f0,
- edgesNext.f1, edgeJoinFunction.edgeJoin(edgesNext.f2, inputNext.f1)));
+ output.f0 = edgesNext.f0;
+ output.f1 = edgesNext.f1;
+ output.f2 = edgeJoinFunction.edgeJoin(edgesNext.f2, inputNext.f1);
+ collector.collect(output);
}
} else {
@@ -793,7 +838,8 @@ public class Graph<K, VV, EV> {
DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
.coGroup(inputDataSet).where(1).equalTo(0)
- .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction));
+ .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction))
+ .name("Join with edges on target");
return new Graph<>(this.vertices, resultedEdges, this.context);
}
@@ -813,7 +859,7 @@ public class Graph<K, VV, EV> {
DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices)
.where(0).equalTo(0).with(new ProjectEdge<K, VV, EV>())
.join(filteredVertices).where(1).equalTo(0)
- .with(new ProjectEdge<K, VV, EV>());
+ .with(new ProjectEdge<K, VV, EV>()).name("Subgraph");
DataSet<Edge<K, EV>> filteredEdges = remainingEdges.filter(edgeFilter);
@@ -834,7 +880,7 @@ public class Graph<K, VV, EV> {
DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices)
.where(0).equalTo(0).with(new ProjectEdge<K, VV, EV>())
.join(filteredVertices).where(1).equalTo(0)
- .with(new ProjectEdge<K, VV, EV>());
+ .with(new ProjectEdge<K, VV, EV>()).name("Filter on vertices");
return new Graph<>(filteredVertices, remainingEdges, this.context);
}
@@ -847,7 +893,7 @@ public class Graph<K, VV, EV> {
* @return the resulting sub-graph.
*/
public Graph<K, VV, EV> filterOnEdges(FilterFunction<Edge<K, EV>> edgeFilter) {
- DataSet<Edge<K, EV>> filteredEdges = this.edges.filter(edgeFilter);
+ DataSet<Edge<K, EV>> filteredEdges = this.edges.filter(edgeFilter).name("Filter on edges");
return new Graph<>(this.vertices, filteredEdges, this.context);
}
@@ -867,7 +913,8 @@ public class Graph<K, VV, EV> {
*/
public DataSet<Tuple2<K, LongValue>> outDegrees() {
- return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup<K, VV, EV>());
+ return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup<K, VV, EV>())
+ .name("Out-degree");
}
private static final class CountNeighborsCoGroup<K, VV, EV>
@@ -903,7 +950,8 @@ public class Graph<K, VV, EV> {
*/
public DataSet<Tuple2<K, LongValue>> inDegrees() {
- return vertices.coGroup(edges).where(0).equalTo(1).with(new CountNeighborsCoGroup<K, VV, EV>());
+ return vertices.coGroup(edges).where(0).equalTo(1).with(new CountNeighborsCoGroup<K, VV, EV>())
+ .name("In-degree");
}
/**
@@ -912,7 +960,9 @@ public class Graph<K, VV, EV> {
* @return A DataSet of {@code Tuple2<vertexId, degree>}
*/
public DataSet<Tuple2<K, LongValue>> getDegrees() {
- return outDegrees().union(inDegrees()).groupBy(0).sum(1);
+ return outDegrees()
+ .union(inDegrees()).name("In- and out-degree")
+ .groupBy(0).sum(1).name("Sum");
}
/**
@@ -922,7 +972,8 @@ public class Graph<K, VV, EV> {
*/
public Graph<K, VV, EV> getUndirected() {
- DataSet<Edge<K, EV>> undirectedEdges = edges.flatMap(new RegularAndReversedEdgesMap<K, EV>());
+ DataSet<Edge<K, EV>> undirectedEdges = edges.
+ flatMap(new RegularAndReversedEdgesMap<K, EV>()).name("To undirected graph");
return new Graph<>(vertices, undirectedEdges, this.context);
}
@@ -946,13 +997,15 @@ public class Graph<K, VV, EV> {
switch (direction) {
case IN:
return vertices.coGroup(edges).where(0).equalTo(1)
- .with(new ApplyCoGroupFunction<>(edgesFunction));
+ .with(new ApplyCoGroupFunction<>(edgesFunction)).name("GroupReduce on in-edges");
case OUT:
return vertices.coGroup(edges).where(0).equalTo(0)
- .with(new ApplyCoGroupFunction<>(edgesFunction));
+ .with(new ApplyCoGroupFunction<>(edgesFunction)).name("GroupReduce on out-edges");
case ALL:
- return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()))
- .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction));
+ return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
+ .name("Emit edge"))
+ .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction))
+ .name("GroupReduce on in- and out-edges");
default:
throw new IllegalArgumentException("Illegal edge direction");
}
@@ -979,13 +1032,17 @@ public class Graph<K, VV, EV> {
switch (direction) {
case IN:
return vertices.coGroup(edges).where(0).equalTo(1)
- .with(new ApplyCoGroupFunction<>(edgesFunction)).returns(typeInfo);
+ .with(new ApplyCoGroupFunction<>(edgesFunction))
+ .name("GroupReduce on in-edges").returns(typeInfo);
case OUT:
return vertices.coGroup(edges).where(0).equalTo(0)
- .with(new ApplyCoGroupFunction<>(edgesFunction)).returns(typeInfo);
+ .with(new ApplyCoGroupFunction<>(edgesFunction))
+ .name("GroupReduce on out-edges").returns(typeInfo);
case ALL:
- return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()))
- .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction)).returns(typeInfo);
+ return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
+ .name("Emit edge"))
+ .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction))
+ .name("GroupReduce on in- and out-edges").returns(typeInfo);
default:
throw new IllegalArgumentException("Illegal edge direction");
}
@@ -1011,15 +1068,18 @@ public class Graph<K, VV, EV> {
switch (direction) {
case IN:
return edges.map(new ProjectVertexIdMap<K, EV>(1))
- .withForwardedFields("f1->f0")
- .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction));
+ .withForwardedFields("f1->f0").name("Vertex ID")
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
+ .name("GroupReduce on in-edges");
case OUT:
return edges.map(new ProjectVertexIdMap<K, EV>(0))
- .withForwardedFields("f0")
- .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction));
+ .withForwardedFields("f0").name("Vertex ID")
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
+ .name("GroupReduce on out-edges");
case ALL:
- return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
- .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction));
+ return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()).name("Emit edge")
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
+ .name("GroupReduce on in- and out-edges");
default:
throw new IllegalArgumentException("Illegal edge direction");
}
@@ -1045,16 +1105,19 @@ public class Graph<K, VV, EV> {
switch (direction) {
case IN:
- return edges.map(new ProjectVertexIdMap<K, EV>(1))
+ return edges.map(new ProjectVertexIdMap<K, EV>(1)).name("Vertex ID")
.withForwardedFields("f1->f0")
- .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo);
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
+ .name("GroupReduce on in-edges").returns(typeInfo);
case OUT:
- return edges.map(new ProjectVertexIdMap<K, EV>(0))
+ return edges.map(new ProjectVertexIdMap<K, EV>(0)).name("Vertex ID")
.withForwardedFields("f0")
- .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo);
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
+ .name("GroupReduce on out-edges").returns(typeInfo);
case ALL:
- return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
- .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo);
+ return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()).name("Emit edge")
+ .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
+ .name("GroupReduce on in- and out-edges").returns(typeInfo);
default:
throw new IllegalArgumentException("Illegal edge direction");
}
@@ -1220,20 +1283,25 @@ public class Graph<K, VV, EV> {
@ForwardedFields("f0->f1; f1->f0; f2")
private static final class ReverseEdgesMap<K, EV>
- implements MapFunction<Edge<K, EV>, Edge<K, EV>> {
+ implements MapFunction<Edge<K, EV>, Edge<K, EV>> {
+ public Edge<K, EV> output = new Edge<>();
- public Edge<K, EV> map(Edge<K, EV> value) {
- return new Edge<>(value.f1, value.f0, value.f2);
+ public Edge<K, EV> map(Edge<K, EV> edge) {
+ output.setFields(edge.f1, edge.f0, edge.f2);
+ return output;
}
}
private static final class RegularAndReversedEdgesMap<K, EV>
- implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> {
+ implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> {
+ public Edge<K, EV> output = new Edge<>();
@Override
public void flatMap(Edge<K, EV> edge, Collector<Edge<K, EV>> out) throws Exception {
- out.collect(new Edge<>(edge.f0, edge.f1, edge.f2));
- out.collect(new Edge<>(edge.f1, edge.f0, edge.f2));
+ out.collect(edge);
+
+ output.setFields(edge.f1, edge.f0, edge.f2);
+ out.collect(output);
}
}
@@ -1244,7 +1312,7 @@ public class Graph<K, VV, EV> {
* @throws UnsupportedOperationException
*/
public Graph<K, VV, EV> reverse() throws UnsupportedOperationException {
- DataSet<Edge<K, EV>> reversedEdges = edges.map(new ReverseEdgesMap<K, EV>());
+ DataSet<Edge<K, EV>> reversedEdges = edges.map(new ReverseEdgesMap<K, EV>()).name("Reverse edges");
return new Graph<>(vertices, reversedEdges, this.context);
}
@@ -1266,7 +1334,7 @@ public class Graph<K, VV, EV> {
* @return The IDs of the vertices as DataSet
*/
public DataSet<K> getVertexIds() {
- return vertices.map(new ExtractVertexIDMapper<K, VV>());
+ return vertices.map(new ExtractVertexIDMapper<K, VV>()).name("Vertex IDs");
}
private static final class ExtractVertexIDMapper<K, VV>
@@ -1281,7 +1349,7 @@ public class Graph<K, VV, EV> {
* @return The IDs of the edges as DataSet
*/
public DataSet<Tuple2<K, K>> getEdgeIds() {
- return edges.map(new ExtractEdgeIDsMapper<K, EV>());
+ return edges.map(new ExtractEdgeIDsMapper<K, EV>()).name("Edge IDs");
}
@ForwardedFields("f0; f1")
@@ -1317,7 +1385,7 @@ public class Graph<K, VV, EV> {
public Graph<K, VV, EV> addVertices(List<Vertex<K, VV>> verticesToAdd) {
// Add the vertices
DataSet<Vertex<K, VV>> newVertices = this.vertices.coGroup(this.context.fromCollection(verticesToAdd))
- .where(0).equalTo(0).with(new VerticesUnionCoGroup<K, VV>());
+ .where(0).equalTo(0).with(new VerticesUnionCoGroup<K, VV>()).name("Add vertices");
return new Graph<>(newVertices, this.edges, this.context);
}
@@ -1371,9 +1439,9 @@ public class Graph<K, VV, EV> {
DataSet<Edge<K,EV>> validNewEdges = this.getVertices().join(newEdgesDataSet)
.where(0).equalTo(0)
- .with(new JoinVerticesWithEdgesOnSrc<K, VV, EV>())
+ .with(new JoinVerticesWithEdgesOnSrc<K, VV, EV>()).name("Join with source")
.join(this.getVertices()).where(1).equalTo(0)
- .with(new JoinWithVerticesOnTrg<K, VV, EV>());
+ .with(new JoinWithVerticesOnTrg<K, VV, EV>()).name("Join with target");
return Graph.fromDataSet(this.vertices, this.edges.union(validNewEdges), this.context);
}
@@ -1435,14 +1503,14 @@ public class Graph<K, VV, EV> {
private Graph<K, VV, EV> removeVertices(DataSet<Vertex<K, VV>> verticesToBeRemoved) {
DataSet<Vertex<K, VV>> newVertices = getVertices().coGroup(verticesToBeRemoved).where(0).equalTo(0)
- .with(new VerticesRemovalCoGroup<K, VV>());
+ .with(new VerticesRemovalCoGroup<K, VV>()).name("Remove vertices");
DataSet <Edge< K, EV>> newEdges = newVertices.join(getEdges()).where(0).equalTo(0)
// if the edge source was removed, the edge will also be removed
- .with(new ProjectEdgeToBeRemoved<K, VV, EV>())
+ .with(new ProjectEdgeToBeRemoved<K, VV, EV>()).name("Edges to be removed")
// if the edge target was removed, the edge will also be removed
.join(newVertices).where(1).equalTo(0)
- .with(new ProjectEdge<K, VV, EV>());
+ .with(new ProjectEdge<K, VV, EV>()).name("Remove edges");
return new Graph<>(newVertices, newEdges, context);
}
@@ -1466,8 +1534,6 @@ public class Graph<K, VV, EV> {
}
}
-
-
@ForwardedFieldsSecond("f0; f1; f2")
private static final class ProjectEdgeToBeRemoved<K,VV,EV> implements JoinFunction<Vertex<K, VV>, Edge<K, EV>, Edge<K, EV>> {
@Override
@@ -1484,7 +1550,7 @@ public class Graph<K, VV, EV> {
* the removed edges
*/
public Graph<K, VV, EV> removeEdge(Edge<K, EV> edge) {
- DataSet<Edge<K, EV>> newEdges = getEdges().filter(new EdgeRemovalEdgeFilter<>(edge));
+ DataSet<Edge<K, EV>> newEdges = getEdges().filter(new EdgeRemovalEdgeFilter<>(edge)).name("Remove edge");
return new Graph<>(this.vertices, newEdges, this.context);
}
@@ -1512,7 +1578,7 @@ public class Graph<K, VV, EV> {
public Graph<K, VV, EV> removeEdges(List<Edge<K, EV>> edgesToBeRemoved) {
DataSet<Edge<K, EV>> newEdges = getEdges().coGroup(this.context.fromCollection(edgesToBeRemoved))
- .where(0,1).equalTo(0,1).with(new EdgeRemovalCoGroup<K, EV>());
+ .where(0,1).equalTo(0,1).with(new EdgeRemovalCoGroup<K, EV>()).name("Remove edges");
return new Graph<>(this.vertices, newEdges, context);
}
@@ -1538,8 +1604,18 @@ public class Graph<K, VV, EV> {
* @return a new graph
*/
public Graph<K, VV, EV> union(Graph<K, VV, EV> graph) {
- DataSet<Vertex<K, VV>> unionedVertices = graph.getVertices().union(this.getVertices()).distinct();
- DataSet<Edge<K, EV>> unionedEdges = graph.getEdges().union(this.getEdges());
+ DataSet<Vertex<K, VV>> unionedVertices = graph
+ .getVertices()
+ .union(this.getVertices())
+ .name("Vertices")
+ .distinct()
+ .name("Vertices");
+
+ DataSet<Edge<K, EV>> unionedEdges = graph
+ .getEdges()
+ .union(this.getEdges())
+ .name("Edges");
+
return new Graph<>(unionedVertices, unionedEdges, this.context);
}
@@ -1603,8 +1679,9 @@ public class Graph<K, VV, EV> {
public Edge<K, EV> join(Edge<K, EV> first, Edge<K, EV> second) throws Exception {
return first;
}
- }).withForwardedFieldsFirst("*")
- .distinct();
+ }).withForwardedFieldsFirst("*").name("Intersect edges")
+ .distinct()
+ .name("Edges");
}
/**
@@ -1619,7 +1696,8 @@ public class Graph<K, VV, EV> {
.coGroup(edges)
.where(0, 1, 2)
.equalTo(0, 1, 2)
- .with(new MatchingEdgeReducer<K, EV>());
+ .with(new MatchingEdgeReducer<K, EV>())
+ .name("Intersect edges");
}
/**
@@ -1827,27 +1905,27 @@ public class Graph<K, VV, EV> {
case IN:
// create <edge-sourceVertex> pairs
DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
- .join(this.vertices).where(0).equalTo(0);
+ .join(this.vertices).where(0).equalTo(0).name("Edge with source vertex");
return vertices.coGroup(edgesWithSources)
.where(0).equalTo("f0.f1")
- .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction));
+ .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)).name("Neighbors function");
case OUT:
// create <edge-targetVertex> pairs
DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
- .join(this.vertices).where(1).equalTo(0);
+ .join(this.vertices).where(1).equalTo(0).name("Edge with target vertex");
return vertices.coGroup(edgesWithTargets)
.where(0).equalTo("f0.f0")
- .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction));
+ .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)).name("Neighbors function");
case ALL:
// create <edge-sourceOrTargetVertex> pairs
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
- .flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
+ .flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>()).name("Forward and reverse edges")
.join(this.vertices).where(1).equalTo(0)
- .with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+ .with(new ProjectEdgeWithNeighbor<K, VV, EV>()).name("Edge with vertex");
return vertices.coGroup(edgesWithNeighbors)
.where(0).equalTo(0)
- .with(new ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction));
+ .with(new ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction)).name("Neighbors function");
default:
throw new IllegalArgumentException("Illegal edge direction");
}
@@ -1875,27 +1953,30 @@ public class Graph<K, VV, EV> {
case IN:
// create <edge-sourceVertex> pairs
DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
- .join(this.vertices).where(0).equalTo(0);
+ .join(this.vertices).where(0).equalTo(0).name("Edge with source vertex");
return vertices.coGroup(edgesWithSources)
.where(0).equalTo("f0.f1")
- .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)).returns(typeInfo);
+ .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction))
+ .name("Neighbors function").returns(typeInfo);
case OUT:
// create <edge-targetVertex> pairs
DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
- .join(this.vertices).where(1).equalTo(0);
+ .join(this.vertices).where(1).equalTo(0).name("Edge with target vertex");
return vertices.coGroup(edgesWithTargets)
.where(0).equalTo("f0.f0")
- .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)).returns(typeInfo);
+ .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction))
+ .name("Neighbors function").returns(typeInfo);
case ALL:
// create <edge-sourceOrTargetVertex> pairs
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
- .flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
+ .flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>()).name("Forward and reverse edges")
.join(this.vertices).where(1).equalTo(0)
- .with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+ .with(new ProjectEdgeWithNeighbor<K, VV, EV>()).name("Edge with vertex");
return vertices.coGroup(edgesWithNeighbors)
.where(0).equalTo(0)
- .with(new ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction)).returns(typeInfo);
+ .with(new ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction))
+ .name("Neighbors function").returns(typeInfo);
default:
throw new IllegalArgumentException("Illegal edge direction");
}
@@ -1924,26 +2005,26 @@ public class Graph<K, VV, EV> {
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
.join(this.vertices).where(0).equalTo(0)
.with(new ProjectVertexIdJoin<K, VV, EV>(1))
- .withForwardedFieldsFirst("f1->f0");
+ .withForwardedFieldsFirst("f1->f0").name("Edge with source vertex ID");
return edgesWithSources.groupBy(0).reduceGroup(
- new ApplyNeighborGroupReduceFunction<>(neighborsFunction));
+ new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).name("Neighbors function");
case OUT:
// create <edge-targetVertex> pairs
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
.join(this.vertices).where(1).equalTo(0)
.with(new ProjectVertexIdJoin<K, VV, EV>(0))
- .withForwardedFieldsFirst("f0");
+ .withForwardedFieldsFirst("f0").name("Edge with target vertex ID");
return edgesWithTargets.groupBy(0).reduceGroup(
- new ApplyNeighborGroupReduceFunction<>(neighborsFunction));
+ new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).name("Neighbors function");
case ALL:
// create <edge-sourceOrTargetVertex> pairs
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
- .flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
+ .flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>()).name("Forward and reverse edges")
.join(this.vertices).where(1).equalTo(0)
- .with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+ .with(new ProjectEdgeWithNeighbor<K, VV, EV>()).name("Edge with vertex ID");
return edgesWithNeighbors.groupBy(0).reduceGroup(
- new ApplyNeighborGroupReduceFunction<>(neighborsFunction));
+ new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).name("Neighbors function");
default:
throw new IllegalArgumentException("Illegal edge direction");
}
@@ -1973,26 +2054,29 @@ public class Graph<K, VV, EV> {
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
.join(this.vertices).where(0).equalTo(0)
.with(new ProjectVertexIdJoin<K, VV, EV>(1))
- .withForwardedFieldsFirst("f1->f0");
+ .withForwardedFieldsFirst("f1->f0").name("Edge with source vertex ID");
return edgesWithSources.groupBy(0).reduceGroup(
- new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo);
+ new ApplyNeighborGroupReduceFunction<>(neighborsFunction))
+ .name("Neighbors function").returns(typeInfo);
case OUT:
// create <edge-targetVertex> pairs
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
.join(this.vertices).where(1).equalTo(0)
.with(new ProjectVertexIdJoin<K, VV, EV>(0))
- .withForwardedFieldsFirst("f0");
+ .withForwardedFieldsFirst("f0").name("Edge with target vertex ID");
return edgesWithTargets.groupBy(0).reduceGroup(
- new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo);
+ new ApplyNeighborGroupReduceFunction<>(neighborsFunction))
+ .name("Neighbors function").returns(typeInfo);
case ALL:
// create <edge-sourceOrTargetVertex> pairs
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
.join(this.vertices).where(1).equalTo(0)
- .with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+ .with(new ProjectEdgeWithNeighbor<K, VV, EV>()).name("Edge with vertex ID");
return edgesWithNeighbors.groupBy(0).reduceGroup(
- new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo);
+ new ApplyNeighborGroupReduceFunction<>(neighborsFunction))
+ .name("Neighbors function").returns(typeInfo);
default:
throw new IllegalArgumentException("Illegal edge direction");
}
@@ -2170,26 +2254,26 @@ public class Graph<K, VV, EV> {
final DataSet<Tuple2<K, VV>> verticesWithSourceNeighborValues = edges
.join(this.vertices).where(0).equalTo(0)
.with(new ProjectVertexWithNeighborValueJoin<K, VV, EV>(1))
- .withForwardedFieldsFirst("f1->f0");
+ .withForwardedFieldsFirst("f1->f0").name("Vertex with in-neighbor value");
return verticesWithSourceNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(
- reduceNeighborsFunction));
+ reduceNeighborsFunction)).name("Neighbors function");
case OUT:
// create <vertex-target value> pairs
DataSet<Tuple2<K, VV>> verticesWithTargetNeighborValues = edges
.join(this.vertices).where(1).equalTo(0)
.with(new ProjectVertexWithNeighborValueJoin<K, VV, EV>(0))
- .withForwardedFieldsFirst("f0");
+ .withForwardedFieldsFirst("f0").name("Vertex with out-neighbor value");
return verticesWithTargetNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(
- reduceNeighborsFunction));
+ reduceNeighborsFunction)).name("Neighbors function");
case ALL:
// create <vertex-neighbor value> pairs
DataSet<Tuple2<K, VV>> verticesWithNeighborValues = edges
.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
.join(this.vertices).where(1).equalTo(0)
- .with(new ProjectNeighborValue<K, VV, EV>());
+ .with(new ProjectNeighborValue<K, VV, EV>()).name("Vertex with neighbor value");
return verticesWithNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(
- reduceNeighborsFunction));
+ reduceNeighborsFunction)).name("Neighbors function");
default:
throw new IllegalArgumentException("Illegal edge direction");
}
@@ -2231,15 +2315,21 @@ public class Graph<K, VV, EV> {
case IN:
return edges.map(new ProjectVertexWithEdgeValueMap<K, EV>(1))
.withForwardedFields("f1->f0")
- .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
+ .name("Vertex with in-edges")
+ .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
+ .name("Reduce on edges");
case OUT:
return edges.map(new ProjectVertexWithEdgeValueMap<K, EV>(0))
.withForwardedFields("f0->f0")
- .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
+ .name("Vertex with out-edges")
+ .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
+ .name("Reduce on edges");
case ALL:
return edges.flatMap(new EmitOneVertexWithEdgeValuePerNode<K, EV>())
.withForwardedFields("f2->f1")
- .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
+ .name("Vertex with all edges")
+ .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
+ .name("Reduce on edges");
default:
throw new IllegalArgumentException("Illegal edge direction");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09e08173/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
index 4859e12..6547f9a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
@@ -20,12 +20,14 @@ package org.apache.flink.graph;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.graph.utils.Tuple2ToEdgeMap;
+import org.apache.flink.graph.utils.Tuple2ToVertexMap;
import org.apache.flink.types.NullValue;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.util.Preconditions;
/**
@@ -44,9 +46,6 @@ public class GraphCsvReader {
protected CsvReader edgeReader;
protected CsvReader vertexReader;
protected MapFunction<?, ?> mapper;
- protected Class<?> vertexKey;
- protected Class<?> vertexValue;
- protected Class<?> edgeValue;
//--------------------------------------------------------------------------------------------------------------------
public GraphCsvReader(Path vertexPath, Path edgePath, ExecutionEnvironment context) {
@@ -104,17 +103,18 @@ public class GraphCsvReader {
public <K, VV, EV> Graph<K, VV, EV> types(Class<K> vertexKey, Class<VV> vertexValue,
Class<EV> edgeValue) {
- DataSet<Tuple2<K, VV>> vertices;
-
if (edgeReader == null) {
- throw new RuntimeException("The edges input file cannot be null!");
+ throw new RuntimeException("The edge input file cannot be null!");
}
DataSet<Tuple3<K, K, EV>> edges = edgeReader.types(vertexKey, vertexKey, edgeValue);
// the vertex value can be provided by an input file or a user-defined mapper
if (vertexReader != null) {
- vertices = vertexReader.types(vertexKey, vertexValue);
+ DataSet<Tuple2<K, VV>> vertices = vertexReader
+ .types(vertexKey, vertexValue)
+ .name(GraphCsvReader.class.getName());
+
return Graph.fromTupleDataSet(vertices, edges, executionContext);
}
else if (mapper != null) {
@@ -135,10 +135,12 @@ public class GraphCsvReader {
public <K, EV> Graph<K, NullValue, EV> edgeTypes(Class<K> vertexKey, Class<EV> edgeValue) {
if (edgeReader == null) {
- throw new RuntimeException("The edges input file cannot be null!");
+ throw new RuntimeException("The edge input file cannot be null!");
}
- DataSet<Tuple3<K, K, EV>> edges = edgeReader.types(vertexKey, vertexKey, edgeValue);
+ DataSet<Tuple3<K, K, EV>> edges = edgeReader
+ .types(vertexKey, vertexKey, edgeValue)
+ .name(GraphCsvReader.class.getName());
return Graph.fromTupleDataSet(edges, executionContext);
}
@@ -151,20 +153,16 @@ public class GraphCsvReader {
public <K> Graph<K, NullValue, NullValue> keyType(Class<K> vertexKey) {
if (edgeReader == null) {
- throw new RuntimeException("The edges input file cannot be null!");
+ throw new RuntimeException("The edge input file cannot be null!");
}
- DataSet<Tuple3<K, K, NullValue>> edges = edgeReader.types(vertexKey, vertexKey)
- .map(new MapFunction<Tuple2<K, K>, Tuple3<K, K, NullValue>>() {
-
- private static final long serialVersionUID = -2981792951286476970L;
+ DataSet<Edge<K, NullValue>> edges = edgeReader
+ .types(vertexKey, vertexKey)
+ .name(GraphCsvReader.class.getName())
+ .map(new Tuple2ToEdgeMap<K>())
+ .name("Type conversion");
- public Tuple3<K, K, NullValue> map(Tuple2<K, K> edge) {
- return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
- }
- }).withForwardedFields("f0;f1");
-
- return Graph.fromTupleDataSet(edges, executionContext);
+ return Graph.fromDataSet(edges, executionContext);
}
/**
@@ -178,28 +176,29 @@ public class GraphCsvReader {
*/
@SuppressWarnings({ "serial", "unchecked" })
public <K, VV> Graph<K, VV, NullValue> vertexTypes(Class<K> vertexKey, Class<VV> vertexValue) {
-
- DataSet<Tuple2<K, VV>> vertices;
if (edgeReader == null) {
- throw new RuntimeException("The edges input file cannot be null!");
+ throw new RuntimeException("The edge input file cannot be null!");
}
- DataSet<Tuple3<K, K, NullValue>> edges = edgeReader.types(vertexKey, vertexKey)
- .map(new MapFunction<Tuple2<K,K>, Tuple3<K, K, NullValue>>() {
-
- public Tuple3<K, K, NullValue> map(Tuple2<K, K> input) {
- return new Tuple3<>(input.f0, input.f1, NullValue.getInstance());
- }
- }).withForwardedFields("f0;f1");
+ DataSet<Edge<K, NullValue>> edges = edgeReader
+ .types(vertexKey, vertexKey)
+ .name(GraphCsvReader.class.getName())
+ .map(new Tuple2ToEdgeMap<K>())
+ .name("To Edge");
// the vertex value can be provided by an input file or a user-defined mapper
if (vertexReader != null) {
- vertices = vertexReader.types(vertexKey, vertexValue);
- return Graph.fromTupleDataSet(vertices, edges, executionContext);
+ DataSet<Vertex<K, VV>> vertices = vertexReader
+ .types(vertexKey, vertexValue)
+ .name(GraphCsvReader.class.getName())
+ .map(new Tuple2ToVertexMap<K, VV>())
+ .name("Type conversion");
+
+ return Graph.fromDataSet(vertices, edges, executionContext);
}
else if (mapper != null) {
- return Graph.fromTupleDataSet(edges, (MapFunction<K, VV>) mapper, executionContext);
+ return Graph.fromDataSet(edges, (MapFunction<K, VV>) mapper, executionContext);
}
else {
throw new RuntimeException("Vertex values have to be specified through a vertices input file"
http://git-wip-us.apache.org/repos/asf/flink/blob/09e08173/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
index 2eee6d7..7f37356 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
@@ -18,11 +18,13 @@
package org.apache.flink.graph.generator;
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
public abstract class AbstractGraphGenerator<K, VV, EV>
implements GraphGenerator<K, VV, EV> {
// Optional configuration
- protected int parallelism = -1;
+ protected int parallelism = PARALLELISM_DEFAULT;
@Override
public GraphGenerator<K,VV,EV> setParallelism(int parallelism) {
http://git-wip-us.apache.org/repos/asf/flink/blob/09e08173/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToEdgeMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToEdgeMap.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToEdgeMap.java
new file mode 100644
index 0000000..5eb8287
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToEdgeMap.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.types.NullValue;
+
+/**
+ * Create an Edge from a Tuple2.
+ *
+ * The new edge's value is set to {@link NullValue}.
+ *
+ * @param <K> edge ID type
+ */
+@ForwardedFields("f0; f1")
+public class Tuple2ToEdgeMap<K> implements MapFunction<Tuple2<K, K>, Edge<K, NullValue>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private Edge<K, NullValue> edge = new Edge<>(null, null, NullValue.getInstance());
+
+ @Override
+ public Edge<K, NullValue> map(Tuple2<K, K> tuple) {
+ edge.f0 = tuple.f0;
+ edge.f1 = tuple.f1;
+ return edge;
+ }
+
+}