You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/19 18:01:10 UTC
[04/10] flink git commit: [FLINK-2857] [gelly] Improve Gelly API and
documentation. - Improve javadocs of Graph creation methods - Add fromTuple2
creation methods - Rename mapper parameters to vertexInitializer. - Improve
javadocs and parameter names of
[FLINK-2857] [gelly] Improve Gelly API and documentation.
- Improve javadocs of Graph creation methods
- Add fromTuple2 creation methods
- Rename mapper parameters to vertexInitializer.
- Improve javadocs and parameter names of joinWith* methods
- Improve javadocs of neighborhood methods
- Update docs to reflect api changes
This closes #1263
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/640e63be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/640e63be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/640e63be
Branch: refs/heads/master
Commit: 640e63beef0b60891178affc7a6e8f0d01a5d000
Parents: da248b1
Author: vasia <va...@apache.org>
Authored: Fri Oct 16 11:11:40 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Oct 19 15:39:37 2015 +0200
----------------------------------------------------------------------
docs/libs/gelly_guide.md | 34 +-
.../org/apache/flink/graph/scala/Graph.scala | 372 ++++++++------
.../operations/GraphCreationWithCsvITCase.scala | 2 +-
.../test/operations/JoinWithEdgesITCase.scala | 7 +-
.../operations/JoinWithVerticesITCase.scala | 8 +-
.../apache/flink/graph/EdgeJoinFunction.java | 45 ++
.../org/apache/flink/graph/EdgesFunction.java | 19 +
.../graph/EdgesFunctionWithVertexValue.java | 20 +-
.../main/java/org/apache/flink/graph/Graph.java | 498 +++++++++++--------
.../apache/flink/graph/NeighborsFunction.java | 19 +
.../graph/NeighborsFunctionWithVertexValue.java | 20 +
.../apache/flink/graph/ReduceEdgesFunction.java | 9 +
.../flink/graph/ReduceNeighborsFunction.java | 9 +
.../apache/flink/graph/VertexJoinFunction.java | 43 ++
.../graph/example/EuclideanGraphWeighing.java | 8 +-
.../graph/example/JaccardSimilarityMeasure.java | 8 +-
.../flink/graph/example/MusicProfiles.java | 7 +-
.../apache/flink/graph/library/GSAPageRank.java | 11 +-
.../flink/graph/library/GSATriangleCount.java | 10 +-
.../apache/flink/graph/library/PageRank.java | 11 +-
.../test/operations/GraphCreationITCase.java | 59 +++
.../test/operations/JoinWithEdgesITCase.java | 41 +-
.../test/operations/JoinWithVerticesITCase.java | 40 +-
23 files changed, 887 insertions(+), 413 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 13d304d..646ec7f 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -149,6 +149,30 @@ val graph = Graph.fromDataSet(vertices, edges, env)
</div>
</div>
+* from a `DataSet` of `Tuple2` representing the edges. Gelly will convert each `Tuple2` to an `Edge`, where the first field will be the source ID and the second field will be the target ID. Both vertex and edge values will be set to `NullValue`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+DataSet<Tuple2<String, String>> edges = ...
+
+Graph<String, NullValue, NullValue> graph = Graph.fromTuple2DataSet(edges, env);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val edges: DataSet[(String, String)] = ...
+
+val graph = Graph.fromTuple2DataSet(edges, env)
+{% endhighlight %}
+</div>
+</div>
+
* from a `DataSet` of `Tuple3` and an optional `DataSet` of `Tuple2`. In this case, Gelly will convert each `Tuple3` to an `Edge`, where the first field will be the source ID, the second field will be the target ID and the third field will be the edge value. Equivalently, each `Tuple2` will be converted to a `Vertex`, where the first field will be the vertex ID and the second field will be the vertex value:
<div class="codetabs" markdown="1">
@@ -424,8 +448,8 @@ graph.subgraph((vertex => vertex.getValue > 0), (edge => edge.getValue < 0))
<img alt="Filter Transformations" width="80%" src="fig/gelly-filter.png"/>
</p>
-* <strong>Join</strong>: Gelly provides specialized methods for joining the vertex and edge datasets with other input datasets. `joinWithVertices` joins the vertices with a `Tuple2` input data set. The join is performed using the vertex ID and the first field of the `Tuple2` input as the join keys. The method returns a new `Graph` where the vertex values have been updated according to a provided user-defined map function.
-Similarly, an input dataset can be joined with the edges, using one of three methods. `joinWithEdges` expects an input `DataSet` of `Tuple3` and joins on the composite key of both source and target vertex IDs. `joinWithEdgesOnSource` expects a `DataSet` of `Tuple2` and joins on the source key of the edges and the first attribute of the input dataset and `joinWithEdgesOnTarget` expects a `DataSet` of `Tuple2` and joins on the target key of the edges and the first attribute of the input dataset. All three methods apply a map function on the edge and the input data set values.
+* <strong>Join</strong>: Gelly provides specialized methods for joining the vertex and edge datasets with other input datasets. `joinWithVertices` joins the vertices with a `Tuple2` input data set. The join is performed using the vertex ID and the first field of the `Tuple2` input as the join keys. The method returns a new `Graph` where the vertex values have been updated according to a provided user-defined transformation function.
+Similarly, an input dataset can be joined with the edges, using one of three methods. `joinWithEdges` expects an input `DataSet` of `Tuple3` and joins on the composite key of both source and target vertex IDs. `joinWithEdgesOnSource` expects a `DataSet` of `Tuple2` and joins on the source key of the edges and the first attribute of the input dataset and `joinWithEdgesOnTarget` expects a `DataSet` of `Tuple2` and joins on the target key of the edges and the first attribute of the input dataset. All three methods apply a transformation function on the edge and the input data set values.
Note that if the input dataset contains a key multiple times, all Gelly join methods will only consider the first value encountered.
<div class="codetabs" markdown="1">
@@ -437,9 +461,9 @@ DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
// assign the transition probabilities as the edge weights
Graph<Long, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees,
- new MapFunction<Tuple2<Double, Long>, Double>() {
- public Double map(Tuple2<Double, Long> value) {
- return value.f0 / value.f1;
+ new VertexJoinFunction<Double, Long>() {
+ public Double vertexJoin(Double vertexValue, Long inputValue) {
+ return vertexValue / inputValue;
}
});
{% endhighlight %}
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 28f3f12..e51453e 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -54,12 +54,13 @@ object Graph {
/**
* Creates a graph from a DataSet of edges.
* Vertices are created automatically and their values are set by applying the provided
- * map function to the vertex ids.
+ * vertexValueInitializer map function to the vertex ids.
*/
def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
- TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]], mapper: MapFunction[K, VV],
- env: ExecutionEnvironment): Graph[K, VV, EV] = {
- wrapGraph(jg.Graph.fromDataSet[K, VV, EV](edges.javaSet, mapper, env.getJavaEnv))
+ TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]],
+ vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, EV] = {
+ wrapGraph(jg.Graph.fromDataSet[K, VV, EV](edges.javaSet, vertexValueInitializer,
+ env.getJavaEnv))
}
/**
@@ -84,16 +85,22 @@ object Graph {
/**
* Creates a graph from a Seq of edges.
* Vertices are created automatically and their values are set by applying the provided
- * map function to the vertex ids.
+ * vertexValueInitializer map function to the vertex ids.
*/
def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
- TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], mapper: MapFunction[K, VV],
- env: ExecutionEnvironment): Graph[K, VV, EV] = {
- wrapGraph(jg.Graph.fromCollection[K, VV, EV](edges.asJavaCollection, mapper, env.getJavaEnv))
+ TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], vertexValueInitializer: MapFunction[K, VV],
+ env: ExecutionEnvironment): Graph[K, VV, EV] = {
+ wrapGraph(jg.Graph.fromCollection[K, VV, EV](edges.asJavaCollection, vertexValueInitializer,
+ env.getJavaEnv))
}
/**
- * Creates a Graph from a DataSets of Tuples.
+ * Creates a graph from DataSets of tuples for vertices and for edges.
+ * The first field of the Tuple2 vertex object will become the vertex ID
+ * and the second field will become the vertex value.
+ * The first field of the Tuple3 object for edges will become the source ID,
+ * the second field will become the target ID, and the third field will become
+ * the edge value.
*/
def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
TypeInformation : ClassTag](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)],
@@ -101,11 +108,14 @@ object Graph {
val javaTupleVertices = vertices.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges,
- env.getJavaEnv))
+ env.getJavaEnv))
}
/**
* Creates a Graph from a DataSet of Tuples representing the edges.
+ * The first field of the Tuple3 object for edges will become the source ID,
+ * the second field will become the target ID, and the third field will become
+ * the edge value.
* Vertices are created automatically and their values are set to NullValue.
*/
def fromTupleDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
@@ -116,14 +126,45 @@ object Graph {
/**
* Creates a Graph from a DataSet of Tuples representing the edges.
+ * The first field of the Tuple3 object for edges will become the source ID,
+ * the second field will become the target ID, and the third field will become
+ * the edge value.
* Vertices are created automatically and their values are set by applying the provided
- * map function to the vertex ids.
+ * vertexValueInitializer map function to the vertex ids.
*/
def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
- TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], mapper: MapFunction[K, VV],
- env: ExecutionEnvironment): Graph[K, VV, EV] = {
+ TypeInformation : ClassTag](edges: DataSet[(K, K, EV)],
+ vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, EV] = {
val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
- wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, mapper, env.getJavaEnv))
+ wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, vertexValueInitializer,
+ env.getJavaEnv))
+ }
+
+ /**
+ * Creates a Graph from a DataSet of Tuple2's representing the edges.
+ * The first field of the Tuple2 object for edges will become the source ID,
+ * the second field will become the target ID. The edge value will be set to NullValue.
+ * Vertices are created automatically and their values are set to NullValue.
+ */
+ def fromTuple2DataSet[K: TypeInformation : ClassTag](edges: DataSet[(K, K)],
+ env: ExecutionEnvironment): Graph[K, NullValue, NullValue] = {
+ val javaTupleEdges = edges.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
+ wrapGraph(jg.Graph.fromTuple2DataSet[K](javaTupleEdges, env.getJavaEnv))
+ }
+
+ /**
+ * Creates a Graph from a DataSet of Tuple2's representing the edges.
+ * The first field of the Tuple2 object for edges will become the source ID,
+ * the second field will become the target ID. The edge value will be set to NullValue.
+ * Vertices are created automatically and their values are set by applying the provided
+ * vertexValueInitializer map function to the vertex IDs.
+ */
+ def fromTuple2DataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag]
+ (edges: DataSet[(K, K)], vertexValueInitializer: MapFunction[K, VV],
+ env: ExecutionEnvironment): Graph[K, VV, NullValue] = {
+ val javaTupleEdges = edges.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
+ wrapGraph(jg.Graph.fromTuple2DataSet[K, VV](javaTupleEdges, vertexValueInitializer,
+ env.getJavaEnv))
}
/**
@@ -160,7 +201,8 @@ object Graph {
* edges file.
* @param includedFieldsEdges The fields in the edges file that should be read.
* By default all fields are read.
- * @param mapper If no vertex values are provided, this mapper can be used to initialize them.
+ * @param vertexValueInitializer If no vertex values are provided,
+ * this mapper can be used to initialize them, by applying a map transformation on the vertex IDs.
*
*/
// scalastyle:off
@@ -186,7 +228,7 @@ object Graph {
ignoreCommentsEdges: String = null,
lenientEdges: Boolean = false,
includedFieldsEdges: Array[Int] = null,
- mapper: MapFunction[K, VV] = null) = {
+ vertexValueInitializer: MapFunction[K, VV] = null) = {
// with vertex and edge values
if (readVertices && hasEdgeValues) {
@@ -229,8 +271,8 @@ object Graph {
includedFieldsEdges)
// initializer provided
- if (mapper != null) {
- fromTupleDataSet[K, VV, EV](edges, mapper, env)
+ if (vertexValueInitializer != null) {
+ fromTupleDataSet[K, VV, EV](edges, vertexValueInitializer, env)
}
else {
fromTupleDataSet[K, EV](edges, env)
@@ -243,8 +285,8 @@ object Graph {
lenientEdges, includedFieldsEdges).map(edge => (edge._1, edge._2, NullValue.getInstance))
// no initializer provided
- if (mapper != null) {
- fromTupleDataSet[K, VV, NullValue](edges, mapper, env)
+ if (vertexValueInitializer != null) {
+ fromTupleDataSet[K, VV, NullValue](edges, vertexValueInitializer, env)
}
else {
fromTupleDataSet[K, NullValue](edges, env)
@@ -369,185 +411,215 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
}
/**
- * Joins the vertex DataSet of this graph with an input DataSet and applies
- * a UDF on the resulted values.
- *
- * @param inputDataSet the DataSet to join with.
- * @param mapper the UDF map function to apply.
- * @return a new graph where the vertex values have been updated.
- */
- def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper: MapFunction[
- (VV, T), VV]): Graph[K, VV, EV] = {
- val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() {
- override def map(value: jtuple.Tuple2[VV, T]): VV = {
- mapper.map((value.f0, value.f1))
- }
- }
+ * Joins the vertex DataSet of this graph with an input Tuple2 DataSet and applies
+ * a user-defined transformation on the values of the matched records.
+ * The vertex ID and the first field of the Tuple2 DataSet are used as the join keys.
+ *
+ * @param inputDataSet the Tuple2 DataSet to join with.
+ * The first field of the Tuple2 is used as the join key and the second field is passed
+ * as a parameter to the transformation function.
+ * @param vertexJoinFunction the transformation function to apply.
+ * The first parameter is the current vertex value and the second parameter is the value
+ * of the matched Tuple2 from the input DataSet.
+ * @return a new Graph, where the vertex values have been updated according to the
+ * result of the vertexJoinFunction.
+ *
+ * @tparam T the type of the second field of the input Tuple2 DataSet.
+ */
+ def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)],
+ vertexJoinFunction: VertexJoinFunction[VV, T]): Graph[K, VV, EV] = {
val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
scalatuple._2)).javaSet
- wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper))
+ wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, vertexJoinFunction))
}
/**
- * Joins the vertex DataSet of this graph with an input DataSet and applies
- * a UDF on the resulted values.
- *
- * @param inputDataSet the DataSet to join with.
- * @param fun the UDF map function to apply.
- * @return a new graph where the vertex values have been updated.
- */
+ * Joins the vertex DataSet of this graph with an input Tuple2 DataSet and applies
+ * a user-defined transformation on the values of the matched records.
+ * The vertex ID and the first field of the Tuple2 DataSet are used as the join keys.
+ *
+ * @param inputDataSet the Tuple2 DataSet to join with.
+ * The first field of the Tuple2 is used as the join key and the second field is passed
+ * as a parameter to the transformation function.
+ * @param fun the transformation function to apply.
+ * The first parameter is the current vertex value and the second parameter is the value
+ * of the matched Tuple2 from the input DataSet.
+ * @return a new Graph, where the vertex values have been updated according to the
+ * result of the vertexJoinFunction.
+ *
+ * @tparam T the type of the second field of the input Tuple2 DataSet.
+ */
def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (VV, T) => VV):
Graph[K, VV, EV] = {
- val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() {
+ val newVertexJoin = new VertexJoinFunction[VV, T]() {
val cleanFun = clean(fun)
- override def map(value: jtuple.Tuple2[VV, T]): VV = {
- cleanFun(value.f0, value.f1)
+ override def vertexJoin(vertexValue: VV, inputValue: T): VV = {
+ cleanFun(vertexValue, inputValue)
}
}
val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
scalatuple._2)).javaSet
- wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper))
+ wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newVertexJoin))
}
/**
- * Joins the edge DataSet with an input DataSet on a composite key of both
- * source and target and applies a UDF on the resulted values.
- *
+ * Joins the edge DataSet with an input DataSet on the composite key of both
+ * source and target IDs and applies a user-defined transformation on the values
+ * of the matched records. The first two fields of the input DataSet are used as join keys.
+ *
* @param inputDataSet the DataSet to join with.
- * @param mapper the UDF map function to apply.
- * @tparam T the return type
- * @return a new graph where the edge values have been updated.
- */
- def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], mapper: MapFunction[
- (EV, T), EV]): Graph[K, VV, EV] = {
- val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
- override def map(value: jtuple.Tuple2[EV, T]): EV = {
- mapper.map((value.f0, value.f1))
- }
- }
+ * The first two fields of the Tuple3 are used as the composite join key
+ * and the third field is passed as a parameter to the transformation function.
+ * @param edgeJoinFunction the transformation function to apply.
+ * The first parameter is the current edge value and the second parameter is the value
+ * of the matched Tuple3 from the input DataSet.
+ *
+ * @tparam T the type of the third field of the input Tuple3 DataSet.
+ * @return a new Graph, where the edge values have been updated according to the
+ * result of the edgeJoinFunction.
+ */
+ def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)],
+ edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = {
val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1,
scalatuple._2, scalatuple._3)).javaSet
- wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper))
+ wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, edgeJoinFunction))
}
/**
- * Joins the edge DataSet with an input DataSet on a composite key of both
- * source and target and applies a UDF on the resulted values.
- *
+ * Joins the edge DataSet with an input DataSet on the composite key of both
+ * source and target IDs and applies a user-defined transformation on the values
+ * of the matched records. The first two fields of the input DataSet are used as join keys.
+ *
* @param inputDataSet the DataSet to join with.
- * @param fun the UDF map function to apply.
- * @tparam T the return type
- * @return a new graph where the edge values have been updated.
- */
+ * The first two fields of the Tuple3 are used as the composite join key
+ * and the third field is passed as a parameter to the transformation function.
+ * @param fun the transformation function to apply.
+ * The first parameter is the current edge value and the second parameter is the value
+ * of the matched Tuple3 from the input DataSet.
+ *
+ * @tparam T the type of the third field of the input Tuple3 DataSet.
+ * @return a new Graph, where the edge values have been updated according to the
+ * result of the edgeJoinFunction.
+ */
def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], fun: (EV, T) => EV):
Graph[K, VV, EV] = {
- val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+ val newEdgeJoin = new EdgeJoinFunction[EV, T]() {
val cleanFun = clean(fun)
- override def map(value: jtuple.Tuple2[EV, T]): EV = {
- cleanFun(value.f0, value.f1)
+ override def edgeJoin(edgeValue: EV, inputValue: T): EV = {
+ cleanFun(edgeValue, inputValue)
}
}
val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1,
scalatuple._2, scalatuple._3)).javaSet
- wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper))
+ wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newEdgeJoin))
}
/**
- * Joins the edge DataSet with an input DataSet on the source key of the
- * edges and the first attribute of the input DataSet and applies a UDF on
- * the resulted values. In case the inputDataSet contains the same key more
- * than once, only the first value will be considered.
- *
+ * Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
+ * on the values of the matched records.
+ * The source ID of the edges input and the first field of the input DataSet
+ * are used as join keys.
+ *
* @param inputDataSet the DataSet to join with.
- * @param mapper the UDF map function to apply.
- * @tparam T the return type
- * @return a new graph where the edge values have been updated.
- */
- def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper:
- MapFunction[(EV, T), EV]): Graph[K, VV, EV] = {
- val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
- override def map(value: jtuple.Tuple2[EV, T]): EV = {
- mapper.map((value.f0, value.f1))
- }
- }
+ * The first field of the Tuple2 is used as the join key
+ * and the second field is passed as a parameter to the transformation function.
+ * @param edgeJoinFunction the transformation function to apply.
+ * The first parameter is the current edge value and the second parameter is the value
+ * of the matched Tuple2 from the input DataSet.
+ * @tparam T the type of the second field of the input Tuple2 DataSet.
+ * @return a new Graph, where the edge values have been updated according to the
+ * result of the edgeJoinFunction.
+ */
+ def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)],
+ edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = {
val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
scalatuple._2)).javaSet
- wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper))
+ wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, edgeJoinFunction))
}
/**
- * Joins the edge DataSet with an input DataSet on the source key of the
- * edges and the first attribute of the input DataSet and applies a UDF on
- * the resulted values. In case the inputDataSet contains the same key more
- * than once, only the first value will be considered.
- *
+ * Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
+ * on the values of the matched records.
+ * The source ID of the edges input and the first field of the input DataSet
+ * are used as join keys.
+ *
* @param inputDataSet the DataSet to join with.
- * @param fun the UDF map function to apply.
- * @tparam T the return type
- * @return a new graph where the edge values have been updated.
- */
+ * The first field of the Tuple2 is used as the join key
+ * and the second field is passed as a parameter to the transformation function.
+ * @param fun the transformation function to apply.
+ * The first parameter is the current edge value and the second parameter is the value
+ * of the matched Tuple2 from the input DataSet.
+ * @tparam T the type of the second field of the input Tuple2 DataSet.
+ * @return a new Graph, where the edge values have been updated according to the
+ * result of the edgeJoinFunction.
+ */
def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (EV, T) =>
EV): Graph[K, VV, EV] = {
- val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+ val newEdgeJoin = new EdgeJoinFunction[EV, T]() {
val cleanFun = clean(fun)
- override def map(value: jtuple.Tuple2[EV, T]): EV = {
- cleanFun(value.f0, value.f1)
+ override def edgeJoin(edgeValue: EV, inputValue: T): EV = {
+ cleanFun(edgeValue, inputValue)
}
}
val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
scalatuple._2)).javaSet
- wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper))
+ wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newEdgeJoin))
}
/**
- * Joins the edge DataSet with an input DataSet on the target key of the
- * edges and the first attribute of the input DataSet and applies a UDF on
- * the resulted values. Should the inputDataSet contain the same key more
- * than once, only the first value will be considered.
- *
+ * Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
+ * on the values of the matched records.
+ * The target ID of the edges input and the first field of the input DataSet
+ * are used as join keys.
+ *
* @param inputDataSet the DataSet to join with.
- * @param mapper the UDF map function to apply.
- * @tparam T the return type
- * @return a new graph where the edge values have been updated.
- */
- def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper:
- MapFunction[(EV, T), EV]): Graph[K, VV, EV] = {
- val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
- override def map(value: jtuple.Tuple2[EV, T]): EV = {
- mapper.map((value.f0, value.f1))
- }
- }
+ * The first field of the Tuple2 is used as the join key
+ * and the second field is passed as a parameter to the transformation function.
+ * @param edgeJoinFunction the transformation function to apply.
+ * The first parameter is the current edge value and the second parameter is the value
+ * of the matched Tuple2 from the input DataSet.
+ * @param T the type of the second field of the input Tuple2 DataSet.
+ * @return a new Graph, where the edge values have been updated according to the
+ * result of the edgeJoinFunction.
+ */
+ def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)],
+ edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = {
val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
scalatuple._2)).javaSet
- wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper))
+ wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, edgeJoinFunction))
}
/**
- * Joins the edge DataSet with an input DataSet on the target key of the
- * edges and the first attribute of the input DataSet and applies a UDF on
- * the resulted values. Should the inputDataSet contain the same key more
- * than once, only the first value will be considered.
- *
+ * Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
+ * on the values of the matched records.
+ * The target ID of the edges input and the first field of the input DataSet
+ * are used as join keys.
+ *
* @param inputDataSet the DataSet to join with.
- * @param fun the UDF map function to apply.
- * @tparam T the return type
- * @return a new graph where the edge values have been updated.
- */
+ * The first field of the Tuple2 is used as the join key
+ * and the second field is passed as a parameter to the transformation function.
+ * @param fun the transformation function to apply.
+ * The first parameter is the current edge value and the second parameter is the value
+ * of the matched Tuple2 from the input DataSet.
+ * @param T the type of the second field of the input Tuple2 DataSet.
+ * @return a new Graph, where the edge values have been updated according to the
+ * result of the edgeJoinFunction.
+ */
def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (EV, T) =>
EV): Graph[K, VV, EV] = {
- val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+ val newEdgeJoin = new EdgeJoinFunction[EV, T]() {
val cleanFun = clean(fun)
- override def map(value: jtuple.Tuple2[EV, T]): EV = {
- cleanFun(value.f0, value.f1)
+ override def edgeJoin(edgeValue: EV, inputValue:T): EV = {
+ cleanFun(edgeValue, inputValue)
}
}
val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
scalatuple._2)).javaSet
- wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper))
+ wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newEdgeJoin))
}
/**
@@ -896,12 +968,17 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
}
/**
- * Compute an aggregate over the neighbor values of each
- * vertex.
- *
- * @param reduceNeighborsFunction the function to apply to the neighborhood
- * @param direction the edge direction (in-, out-, all-)
- * @return a Dataset containing one value per vertex (vertex id, aggregate vertex value)
+ * Compute a reduce transformation over the neighbors' vertex values of each vertex.
+ * For each vertex, the transformation consecutively calls a
+ * {@link ReduceNeighborsFunction} until only a single value for each vertex remains.
+ * The {@link ReduceNeighborsFunction} combines a pair of neighbor vertex values
+ * into one new value of the same type.
+ *
+ * @param reduceNeighborsFunction the reduce function to apply to the neighbors of each vertex.
+ * @param direction the edge direction (in-, out-, all-)
+ * @return a Dataset of Tuple2, with one tuple per vertex.
+ * The first field of the Tuple2 is the vertex ID and the second field
+ * is the aggregate value computed by the provided {@link ReduceNeighborsFunction}.
*/
def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], direction:
EdgeDirection): DataSet[(K, VV)] = {
@@ -910,13 +987,18 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
}
/**
- * Compute an aggregate over the edge values of each vertex.
- *
- * @param reduceEdgesFunction the function to apply to the neighborhood
- * @param direction the edge direction (in-, out-, all-)
- * @return a Dataset containing one value per vertex(vertex key, aggegate edge value)
- * @throws IllegalArgumentException
- */
+ * Compute a reduce transformation over the neighbors' vertex values of each vertex.
+ * For each vertex, the transformation consecutively calls a
+ * {@link ReduceNeighborsFunction} until only a single value for each vertex remains.
+ * The {@link ReduceNeighborsFunction} combines a pair of neighbor vertex values
+ * into one new value of the same type.
+ *
+ * @param reduceNeighborsFunction the reduce function to apply to the neighbors of each vertex.
+ * @param direction the edge direction (in-, out-, all-)
+ * @return a Dataset of Tuple2, with one tuple per vertex.
+ * The first field of the Tuple2 is the vertex ID and the second field
+ * is the aggregate value computed by the provided {@link ReduceNeighborsFunction}.
+ */
def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection):
DataSet[(K, EV)] = {
wrap(jgraph.reduceOnEdges(reduceEdgesFunction, direction)).map(jtuple => (jtuple.f0,
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
index 6ceaf16..a963845 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
@@ -101,7 +101,7 @@ MultipleProgramsTestBase(mode) {
val graph = Graph.fromCsvReader[Long, Double, Long](
readVertices = false,
pathEdges = edgesSplit.getPath.toString,
- mapper = new VertexDoubleIdAssigner(),
+ vertexValueInitializer = new VertexDoubleIdAssigner(),
env = env)
val result = graph.getTriplets.collect()
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
index 3dc90fc..83fa61b 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
@@ -30,6 +30,7 @@ import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
import _root_.scala.collection.JavaConverters._
+import org.apache.flink.graph.EdgeJoinFunction
@RunWith(classOf[Parameterized])
class JoinWithEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
@@ -127,10 +128,10 @@ MultipleProgramsTestBase(mode) {
}
- final class AddValuesMapper extends MapFunction[(Long, Long), Long] {
+ final class AddValuesMapper extends EdgeJoinFunction[Long, Long] {
@throws(classOf[Exception])
- def map(tuple: (Long, Long)): Long = {
- tuple._1 + tuple._2
+ def edgeJoin(edgeValue: Long, inputValue: Long): Long = {
+ edgeValue + inputValue
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
index 98ee8b6..f2beb7b 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
@@ -29,6 +29,7 @@ import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
import _root_.scala.collection.JavaConverters._
+import org.apache.flink.graph.VertexJoinFunction
@RunWith(classOf[Parameterized])
class JoinWithVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
@@ -63,11 +64,10 @@ MultipleProgramsTestBase(mode) {
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
-
- final class AddValuesMapper extends MapFunction[(Long, Long), Long] {
+ final class AddValuesMapper extends VertexJoinFunction[Long, Long] {
@throws(classOf[Exception])
- def map(tuple: (Long, Long)): Long = {
- tuple._1 + tuple._2
+ def vertexJoin(vertexValue: Long, inputValue: Long): Long = {
+ vertexValue + inputValue
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeJoinFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeJoinFunction.java
new file mode 100644
index 0000000..68d6e53
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeJoinFunction.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+
+/**
+ * Interface to be implemented by the transformation function
+ * applied in {@link Graph#joinWithEdges(DataSet, EdgeJoinFunction)},
+ * {@link Graph#joinWithEdgesOnSource(DataSet, EdgeJoinFunction)}, and
+ * {@link Graph#joinWithEdgesOnTarget(DataSet, EdgeJoinFunction)} methods.
+ *
+ * @param <EV> the edge value type
+ * @param <T> the input value type
+ */
+public interface EdgeJoinFunction<EV, T> extends Function, Serializable {
+
+ /**
+ * Applies a transformation on the current edge value
+ * and the value of the matched tuple of the input DataSet.
+ *
+ * @param edgeValue the current edge value
+ * @param inputValue the value of the matched Tuple2 input
+ * @return the new edge value
+ */
+ EV edgeJoin(EV edgeValue, T inputValue) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
index bf1d6a2..07e14e9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
@@ -34,5 +34,24 @@ import org.apache.flink.util.Collector;
*/
public interface EdgesFunction<K, EV, O> extends Function, Serializable {
+ /**
+ * This method is called per vertex and can iterate over all of its neighboring edges
+ * with the specified direction.
+ * <p>
+ * If called with {@link EdgeDirection#OUT} the group will contain
+ * the out-edges of the grouping vertex.
+ * If called with {@link EdgeDirection#IN} the group will contain
+ * the in-edges of the grouping vertex.
+ * If called with {@link EdgeDirection#ALL} the group will contain
+ * all edges of the grouping vertex.
+ * <p>
+ * The method can emit any number of output elements, including none.
+ *
+ * @param edges the neighboring edges of the grouping vertex.
+ * The first filed of each Tuple2 is the ID of the grouping vertex.
+ * The second field is the neighboring edge.
+ * @param out the collector to emit results to
+ * @throws Exception
+ */
void iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<O> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
index 0b0ab0e..645bd7c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
@@ -35,5 +35,23 @@ import org.apache.flink.util.Collector;
*/
public interface EdgesFunctionWithVertexValue<K, VV, EV, O> extends Function, Serializable {
- void iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges, Collector<O> out) throws Exception;
+ /**
+ * This method is called per vertex and can iterate over all of its neighboring edges
+ * with the specified direction.
+ * <p>
+ * If called with {@link EdgeDirection#OUT} the group will contain
+ * the out-edges of the grouping vertex.
+ * If called with {@link EdgeDirection#IN} the group will contain
+ * the in-edges of the grouping vertex.
+ * If called with {@link EdgeDirection#ALL} the group will contain
+ * all edges of the grouping vertex.
+ * <p>
+ * The method can emit any number of output elements, including none.
+ *
+ * @param vertex the grouping vertex
+ * @param edges the neighboring edges of the grouping vertex.
+ * @param out the collector to emit results to
+ * @throws Exception
+ */
+ void iterateEdges(Vertex<K, VV> vertex, Iterable<Edge<K, EV>> edges, Collector<O> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index b24f749..6015be4 100755
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -111,11 +111,11 @@ public class Graph<K, VV, EV> {
}
/**
- * Creates a graph from a Collection of edges, vertices are induced from the
- * edges. Vertices are created automatically and their values are set to
+ * Creates a graph from a Collection of edges.
+ * Vertices are created automatically and their values are set to
* NullValue.
*
- * @param edges a Collection of vertices.
+ * @param edges a Collection of edges.
* @param context the flink execution environment.
* @return the newly created graph.
*/
@@ -126,20 +126,20 @@ public class Graph<K, VV, EV> {
}
/**
- * Creates a graph from a Collection of edges, vertices are induced from the
- * edges and vertex values are calculated by a mapper function. Vertices are
- * created automatically and their values are set by applying the provided
- * map function to the vertex ids.
+ * Creates a graph from a Collection of edges.
+ * Vertices are created automatically and their values are set
+ * by applying the provided map function to the vertex IDs.
*
* @param edges a Collection of edges.
- * @param mapper the mapper function.
+ * @param vertexValueInitializer a map function that initializes the vertex values.
+ * It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
* @param context the flink execution environment.
* @return the newly created graph.
*/
public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Edge<K, EV>> edges,
- final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
+ final MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
- return fromDataSet(context.fromCollection(edges), mapper, context);
+ return fromDataSet(context.fromCollection(edges), vertexValueInitializer, context);
}
/**
@@ -157,8 +157,8 @@ public class Graph<K, VV, EV> {
}
/**
- * Creates a graph from a DataSet of edges, vertices are induced from the
- * edges. Vertices are created automatically and their values are set to
+ * Creates a graph from a DataSet of edges.
+ * Vertices are created automatically and their values are set to
* NullValue.
*
* @param edges a DataSet of edges.
@@ -183,23 +183,23 @@ public class Graph<K, VV, EV> {
}
/**
- * Creates a graph from a DataSet of edges, vertices are induced from the
- * edges and vertex values are calculated by a mapper function. Vertices are
- * created automatically and their values are set by applying the provided
- * map function to the vertex ids.
+ * Creates a graph from a DataSet of edges.
+ * Vertices are created automatically and their values are set
+ * by applying the provided map function to the vertex IDs.
*
* @param edges a DataSet of edges.
- * @param mapper the mapper function.
+ * @param vertexValueInitializer the mapper function that initializes the vertex values.
+ * It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
* @param context the flink execution environment.
* @return the newly created graph.
*/
public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Edge<K, EV>> edges,
- final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
+ final MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
TypeInformation<VV> valueType = TypeExtractor.createTypeInfo(
- MapFunction.class, mapper.getClass(), 1, null, null);
+ MapFunction.class, vertexValueInitializer.getClass(), 1, null, null);
@SuppressWarnings({ "unchecked", "rawtypes" })
TypeInformation<Vertex<K, VV>> returnType = (TypeInformation<Vertex<K, VV>>) new TupleTypeInfo(
@@ -209,7 +209,7 @@ public class Graph<K, VV, EV> {
.flatMap(new EmitSrcAndTargetAsTuple1<K, EV>()).distinct()
.map(new MapFunction<Tuple1<K>, Vertex<K, VV>>() {
public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
- return new Vertex<K, VV>(value.f0, mapper.map(value.f0));
+ return new Vertex<K, VV>(value.f0, vertexValueInitializer.map(value.f0));
}
}).returns(returnType).withForwardedFields("f0");
@@ -226,13 +226,17 @@ public class Graph<K, VV, EV> {
}
/**
- * Creates a graph from a DataSet of Tuple objects for vertices and edges.
- *
- * Vertices with value are created from Tuple2, Edges with value are created
- * from Tuple3.
+ * Creates a graph from a DataSet of Tuple2 objects for vertices and
+ * Tuple3 objects for edges.
+ * <p>
+ * The first field of the Tuple2 vertex object will become the vertex ID
+ * and the second field will become the vertex value.
+ * The first field of the Tuple3 object for edges will become the source ID,
+ * the second field will become the target ID, and the third field will become
+ * the edge value.
*
- * @param vertices a DataSet of Tuple2.
- * @param edges a DataSet of Tuple3.
+ * @param vertices a DataSet of Tuple2 representing the vertices.
+ * @param edges a DataSet of Tuple3 representing the edges.
* @param context the flink execution environment.
* @return the newly created graph.
*/
@@ -245,13 +249,15 @@ public class Graph<K, VV, EV> {
}
/**
- * Creates a graph from a DataSet of Tuple objects for edges, vertices are
- * induced from the edges.
+ * Creates a graph from a DataSet of Tuple3 objects for edges.
+ * <p>
+ * The first field of the Tuple3 object will become the source ID,
+ * the second field will become the target ID, and the third field will become
+ * the edge value.
+ * <p>
+ * Vertices are created automatically and their values are set to NullValue.
*
- * Edges with value are created from Tuple3. Vertices are created
- * automatically and their values are set to NullValue.
- *
- * @param edges a DataSet of Tuple3.
+ * @param edges a DataSet of Tuple3 representing the edges.
* @param context the flink execution environment.
* @return the newly created graph.
*/
@@ -263,22 +269,78 @@ public class Graph<K, VV, EV> {
}
/**
- * Creates a graph from a DataSet of Tuple objects for edges, vertices are
- * induced from the edges and vertex values are calculated by a mapper
- * function. Edges with value are created from Tuple3. Vertices are created
- * automatically and their values are set by applying the provided map
- * function to the vertex ids.
+ * Creates a graph from a DataSet of Tuple3 objects for edges.
+ * <p>
+ * Each Tuple3 will become one Edge, where the source ID will be the first field of the Tuple2,
+ * the target ID will be the second field of the Tuple2
+ * and the Edge value will be the third field of the Tuple3.
+ * <p>
+ * Vertices are created automatically and their values are initialized
+ * by applying the provided vertexValueInitializer map function to the vertex IDs.
*
* @param edges a DataSet of Tuple3.
- * @param mapper the mapper function.
+ * @param vertexValueInitializer the mapper function that initializes the vertex values.
+ * It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
* @param context the flink execution environment.
* @return the newly created graph.
*/
public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges,
- final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
+ final MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
- return fromDataSet(edgeDataSet, mapper, context);
+ return fromDataSet(edgeDataSet, vertexValueInitializer, context);
+ }
+
+ /**
+ * Creates a graph from a DataSet of Tuple2 objects for edges.
+ * Each Tuple2 will become one Edge, where the source ID will be the first field of the Tuple2
+ * and the target ID will be the second field of the Tuple2.
+ * <p>
+ * Edge value types and Vertex values types will be set to NullValue.
+ *
+ * @param edges a DataSet of Tuple2.
+ * @param context the flink execution environment.
+ * @return the newly created graph.
+ */
+ public static <K> Graph<K, NullValue, NullValue> fromTuple2DataSet(DataSet<Tuple2<K, K>> edges,
+ ExecutionEnvironment context) {
+
+ DataSet<Edge<K, NullValue>> edgeDataSet = edges.map(
+ new MapFunction<Tuple2<K, K>, Edge<K, NullValue>>() {
+
+ public Edge<K, NullValue> map(Tuple2<K, K> input) {
+ return new Edge<K, NullValue>(input.f0, input.f1, NullValue.getInstance());
+ }
+ }).withForwardedFields("f0; f1");
+ return fromDataSet(edgeDataSet, context);
+ }
+
+ /**
+ * Creates a graph from a DataSet of Tuple2 objects for edges.
+ * Each Tuple2 will become one Edge, where the source ID will be the first field of the Tuple2
+ * and the target ID will be the second field of the Tuple2.
+ * <p>
+ * Edge value types will be set to NullValue.
+ * Vertex values can be initialized by applying a user-defined map function on the vertex IDs.
+ *
+ * @param edges a DataSet of Tuple2, where the first field corresponds to the source ID
+ * and the second field corresponds to the target ID.
+ * @param vertexValueInitializer the mapper function that initializes the vertex values.
+ * It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
+ * @param context the flink execution environment.
+ * @return the newly created graph.
+ */
+ public static <K, VV> Graph<K, VV, NullValue> fromTuple2DataSet(DataSet<Tuple2<K, K>> edges,
+ final MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
+
+ DataSet<Edge<K, NullValue>> edgeDataSet = edges.map(
+ new MapFunction<Tuple2<K, K>, Edge<K, NullValue>>() {
+
+ public Edge<K, NullValue> map(Tuple2<K, K> input) {
+ return new Edge<K, NullValue>(input.f0, input.f1, NullValue.getInstance());
+ }
+ }).withForwardedFields("f0; f1");
+ return fromDataSet(edgeDataSet, vertexValueInitializer, context);
}
/**
@@ -318,10 +380,11 @@ public class Graph<K, VV, EV> {
/**
* Creates a graph from a CSV file of edges. Vertices will be created automatically and
- * Vertex values are set by the provided mapper.
+ * Vertex values can be initialized using a user-defined mapper.
*
* @param edgesPath a path to a CSV file with the Edge data
- * @param mapper the mapper function.
+ * @param vertexValueInitializer the mapper function that initializes the vertex values.
+ * It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
* @param context the execution environment.
* @return An instance of {@link org.apache.flink.graph.GraphCsvReader},
* on which calling methods to specify types of the Vertex ID, Vertex Value and Edge value returns a Graph.
@@ -332,8 +395,8 @@ public class Graph<K, VV, EV> {
* {@link org.apache.flink.graph.GraphCsvReader#keyType(Class)}.
*/
public static <K, VV> GraphCsvReader fromCsvReader(String edgesPath,
- final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
- return new GraphCsvReader(edgesPath, mapper, context);
+ final MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
+ return new GraphCsvReader(edgesPath, vertexValueInitializer, context);
}
/**
@@ -501,29 +564,37 @@ public class Graph<K, VV, EV> {
}
/**
- * Joins the vertex DataSet of this graph with an input DataSet and applies
- * a UDF on the resulted values.
+ * Joins the vertex DataSet of this graph with an input Tuple2 DataSet and applies
+ * a user-defined transformation on the values of the matched records.
+ * The vertex ID and the first field of the Tuple2 DataSet are used as the join keys.
*
- * @param inputDataSet the DataSet to join with.
- * @param mapper the UDF map function to apply.
- * @return a new graph where the vertex values have been updated.
- */
+ * @param inputDataSet the Tuple2 DataSet to join with.
+ * The first field of the Tuple2 is used as the join key and the second field is passed
+ * as a parameter to the transformation function.
+ * @param vertexJoinFunction the transformation function to apply.
+ * The first parameter is the current vertex value and the second parameter is the value
+ * of the matched Tuple2 from the input DataSet.
+ * @return a new Graph, where the vertex values have been updated according to the
+ * result of the vertexJoinFunction.
+ *
+ * @param <T> the type of the second field of the input Tuple2 DataSet.
+ */
public <T> Graph<K, VV, EV> joinWithVertices(DataSet<Tuple2<K, T>> inputDataSet,
- final MapFunction<Tuple2<VV, T>, VV> mapper) {
+ final VertexJoinFunction<VV, T> vertexJoinFunction) {
DataSet<Vertex<K, VV>> resultedVertices = this.getVertices()
.coGroup(inputDataSet).where(0).equalTo(0)
- .with(new ApplyCoGroupToVertexValues<K, VV, T>(mapper));
+ .with(new ApplyCoGroupToVertexValues<K, VV, T>(vertexJoinFunction));
return new Graph<K, VV, EV>(resultedVertices, this.edges, this.context);
}
private static final class ApplyCoGroupToVertexValues<K, VV, T>
implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, Vertex<K, VV>> {
- private MapFunction<Tuple2<VV, T>, VV> mapper;
+ private VertexJoinFunction<VV, T> vertexJoinFunction;
- public ApplyCoGroupToVertexValues(MapFunction<Tuple2<VV, T>, VV> mapper) {
- this.mapper = mapper;
+ public ApplyCoGroupToVertexValues(VertexJoinFunction<VV, T> mapper) {
+ this.vertexJoinFunction = mapper;
}
@Override
@@ -537,42 +608,46 @@ public class Graph<K, VV, EV> {
if (inputIterator.hasNext()) {
final Tuple2<K, T> inputNext = inputIterator.next();
- collector.collect(new Vertex<K, VV>(inputNext.f0, mapper
- .map(new Tuple2<VV, T>(vertexIterator.next().f1,
- inputNext.f1))));
+ collector.collect(new Vertex<K, VV>(inputNext.f0, vertexJoinFunction
+ .vertexJoin(vertexIterator.next().f1, inputNext.f1)));
} else {
collector.collect(vertexIterator.next());
}
-
}
}
}
/**
- * Joins the edge DataSet with an input DataSet on a composite key of both
- * source and target and applies a UDF on the resulted values.
+ * Joins the edge DataSet with an input DataSet on the composite key of both
+ * source and target IDs and applies a user-defined transformation on the values
+ * of the matched records. The first two fields of the input DataSet are used as join keys.
*
* @param inputDataSet the DataSet to join with.
- * @param mapper the UDF map function to apply.
- * @param <T> the return type
- * @return a new graph where the edge values have been updated.
- */
+ * The first two fields of the Tuple3 are used as the composite join key
+ * and the third field is passed as a parameter to the transformation function.
+ * @param edgeJoinFunction the transformation function to apply.
+ * The first parameter is the current edge value and the second parameter is the value
+ * of the matched Tuple3 from the input DataSet.
+ * @param <T> the type of the third field of the input Tuple3 DataSet.
+ * @return a new Graph, where the edge values have been updated according to the
+ * result of the edgeJoinFunction.
+ */
public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> inputDataSet,
- final MapFunction<Tuple2<EV, T>, EV> mapper) {
+ final EdgeJoinFunction<EV, T> edgeJoinFunction) {
DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
.coGroup(inputDataSet).where(0, 1).equalTo(0, 1)
- .with(new ApplyCoGroupToEdgeValues<K, EV, T>(mapper));
+ .with(new ApplyCoGroupToEdgeValues<K, EV, T>(edgeJoinFunction));
return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
}
private static final class ApplyCoGroupToEdgeValues<K, EV, T>
implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> {
- private MapFunction<Tuple2<EV, T>, EV> mapper;
+ private EdgeJoinFunction<EV, T> edgeJoinFunction;
- public ApplyCoGroupToEdgeValues(MapFunction<Tuple2<EV, T>, EV> mapper) {
- this.mapper = mapper;
+ public ApplyCoGroupToEdgeValues(EdgeJoinFunction<EV, T> mapper) {
+ this.edgeJoinFunction = mapper;
}
@Override
@@ -587,8 +662,8 @@ public class Graph<K, VV, EV> {
final Tuple3<K, K, T> inputNext = inputIterator.next();
collector.collect(new Edge<K, EV>(inputNext.f0,
- inputNext.f1, mapper.map(new Tuple2<EV, T>(
- edgesIterator.next().f2, inputNext.f2))));
+ inputNext.f1, edgeJoinFunction.edgeJoin(
+ edgesIterator.next().f2, inputNext.f2)));
} else {
collector.collect(edgesIterator.next());
}
@@ -597,22 +672,26 @@ public class Graph<K, VV, EV> {
}
/**
- * Joins the edge DataSet with an input DataSet on the source key of the
- * edges and the first attribute of the input DataSet and applies a UDF on
- * the resulted values. In case the inputDataSet contains the same key more
- * than once, only the first value will be considered.
+ * Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
+ * on the values of the matched records.
+ * The source ID of the edges input and the first field of the input DataSet are used as join keys.
*
* @param inputDataSet the DataSet to join with.
- * @param mapper the UDF map function to apply.
- * @param <T> the return type
- * @return a new graph where the edge values have been updated.
- */
+ * The first field of the Tuple2 is used as the join key
+ * and the second field is passed as a parameter to the transformation function.
+ * @param edgeJoinFunction the transformation function to apply.
+ * The first parameter is the current edge value and the second parameter is the value
+ * of the matched Tuple2 from the input DataSet.
+ * @param <T> the type of the second field of the input Tuple2 DataSet.
+ * @return a new Graph, where the edge values have been updated according to the
+ * result of the edgeJoinFunction.
+ */
public <T> Graph<K, VV, EV> joinWithEdgesOnSource(DataSet<Tuple2<K, T>> inputDataSet,
- final MapFunction<Tuple2<EV, T>, EV> mapper) {
+ final EdgeJoinFunction<EV, T> edgeJoinFunction) {
DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
.coGroup(inputDataSet).where(0).equalTo(0)
- .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
+ .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction));
return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
}
@@ -620,11 +699,10 @@ public class Graph<K, VV, EV> {
private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>
implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> {
- private MapFunction<Tuple2<EV, T>, EV> mapper;
+ private EdgeJoinFunction<EV, T> edgeJoinFunction;
- public ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(
- MapFunction<Tuple2<EV, T>, EV> mapper) {
- this.mapper = mapper;
+ public ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(EdgeJoinFunction<EV, T> mapper) {
+ this.edgeJoinFunction = mapper;
}
@Override
@@ -641,8 +719,7 @@ public class Graph<K, VV, EV> {
Edge<K, EV> edgesNext = edgesIterator.next();
collector.collect(new Edge<K, EV>(edgesNext.f0,
- edgesNext.f1, mapper.map(new Tuple2<EV, T>(
- edgesNext.f2, inputNext.f1))));
+ edgesNext.f1, edgeJoinFunction.edgeJoin(edgesNext.f2, inputNext.f1)));
}
} else {
@@ -654,22 +731,26 @@ public class Graph<K, VV, EV> {
}
/**
- * Joins the edge DataSet with an input DataSet on the target key of the
- * edges and the first attribute of the input DataSet and applies a UDF on
- * the resulted values. Should the inputDataSet contain the same key more
- * than once, only the first value will be considered.
+ * Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
+ * on the values of the matched records.
+ * The target ID of the edges input and the first field of the input DataSet are used as join keys.
*
* @param inputDataSet the DataSet to join with.
- * @param mapper the UDF map function to apply.
- * @param <T> the return type
- * @return a new graph where the edge values have been updated.
- */
+ * The first field of the Tuple2 is used as the join key
+ * and the second field is passed as a parameter to the transformation function.
+ * @param edgeJoinFunction the transformation function to apply.
+ * The first parameter is the current edge value and the second parameter is the value
+ * of the matched Tuple2 from the input DataSet.
+ * @param <T> the type of the second field of the input Tuple2 DataSet.
+ * @return a new Graph, where the edge values have been updated according to the
+ * result of the edgeJoinFunction.
+ */
public <T> Graph<K, VV, EV> joinWithEdgesOnTarget(DataSet<Tuple2<K, T>> inputDataSet,
- final MapFunction<Tuple2<EV, T>, EV> mapper) {
+ final EdgeJoinFunction<EV, T> edgeJoinFunction) {
DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
.coGroup(inputDataSet).where(1).equalTo(0)
- .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
+ .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction));
return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
}
@@ -798,20 +879,21 @@ public class Graph<K, VV, EV> {
}
/**
- * Compute an aggregate over the edges of each vertex. The function applied
- * on the edges has access to the vertex value.
+ * Groups by vertex and computes a GroupReduce transformation over the edge values of each vertex.
+ * The edgesFunction applied on the edges has access to both the id and the value
+ * of the grouping vertex.
+ *
+ * For each vertex, the edgesFunction can iterate over all edges of this vertex
+ * with the specified direction, and emit any number of output elements, including none.
*
- * @param edgesFunction
- * the function to apply to the neighborhood
- * @param direction
- * the edge direction (in-, out-, all-)
- * @param <T>
- * the output type
- * @return a dataset of a T
+ * @param edgesFunction the group reduce function to apply to the neighboring edges of each vertex.
+ * @param direction the edge direction (in-, out-, all-).
+ * @param <T> the output type
+ * @return a DataSet containing elements of type T
* @throws IllegalArgumentException
- */
+ */
public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
- EdgeDirection direction) throws IllegalArgumentException {
+ EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
@@ -829,21 +911,22 @@ public class Graph<K, VV, EV> {
}
/**
- * Compute an aggregate over the edges of each vertex. The function applied
- * on the edges has access to the vertex value.
- *
- * @param edgesFunction
- * the function to apply to the neighborhood
- * @param direction
- * the edge direction (in-, out-, all-)
- * @param <T>
- * the output type
+ * Groups by vertex and computes a GroupReduce transformation over the edge values of each vertex.
+ * The edgesFunction applied on the edges has access to both the id and the value
+ * of the grouping vertex.
+ *
+ * For each vertex, the edgesFunction can iterate over all edges of this vertex
+ * with the specified direction, and emit any number of output elements, including none.
+ *
+ * @param edgesFunction the group reduce function to apply to the neighboring edges of each vertex.
+ * @param direction the edge direction (in-, out-, all-).
+ * @param <T> the output type
* @param typeInfo the explicit return type.
- * @return a dataset of a T
+ * @return a DataSet containing elements of type T
* @throws IllegalArgumentException
- */
+ */
public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
- EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+ EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
switch (direction) {
case IN:
@@ -861,20 +944,21 @@ public class Graph<K, VV, EV> {
}
/**
- * Compute an aggregate over the edges of each vertex. The function applied
- * on the edges only has access to the vertex id (not the vertex value).
+ * Groups by vertex and computes a GroupReduce transformation over the edge values of each vertex.
+ * The edgesFunction applied on the edges only has access to the vertex id (not the vertex value)
+ * of the grouping vertex.
*
- * @param edgesFunction
- * the function to apply to the neighborhood
- * @param direction
- * the edge direction (in-, out-, all-)
- * @param <T>
- * the output type
- * @return a dataset of T
+ * For each vertex, the edgesFunction can iterate over all edges of this vertex
+ * with the specified direction, and emit any number of output elements, including none.
+ *
+ * @param edgesFunction the group reduce function to apply to the neighboring edges of each vertex.
+ * @param direction the edge direction (in-, out-, all-).
+ * @param <T> the output type
+ * @return a DataSet containing elements of type T
* @throws IllegalArgumentException
- */
+ */
public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
- EdgeDirection direction) throws IllegalArgumentException {
+ EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
@@ -894,21 +978,22 @@ public class Graph<K, VV, EV> {
}
/**
- * Compute an aggregate over the edges of each vertex. The function applied
- * on the edges only has access to the vertex id (not the vertex value).
- *
- * @param edgesFunction
- * the function to apply to the neighborhood
- * @param direction
- * the edge direction (in-, out-, all-)
- * @param <T>
- * the output type
+ * Groups by vertex and computes a GroupReduce transformation over the edge values of each vertex.
+ * The edgesFunction applied on the edges only has access to the vertex id (not the vertex value)
+ * of the grouping vertex.
+ *
+ * For each vertex, the edgesFunction can iterate over all edges of this vertex
+ * with the specified direction, and emit any number of output elements, including none.
+ *
+ * @param edgesFunction the group reduce function to apply to the neighboring edges of each vertex.
+ * @param direction the edge direction (in-, out-, all-).
+ * @param <T> the output type
* @param typeInfo the explicit return type.
- * @return a dataset of T
+ * @return a DataSet containing elements of type T
* @throws IllegalArgumentException
- */
+ */
public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
- EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+ EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
switch (direction) {
case IN:
@@ -1515,18 +1600,22 @@ public class Graph<K, VV, EV> {
}
/**
- * Compute an aggregate over the neighbors (edges and vertices) of each
- * vertex. The function applied on the neighbors has access to the vertex
- * value.
+ * Groups by vertex and computes a GroupReduce transformation over the neighbors (both edges and vertices)
+ * of each vertex. The neighborsFunction applied on the neighbors only has access to both the vertex id
+ * and the vertex value of the grouping vertex.
*
- * @param neighborsFunction the function to apply to the neighborhood
- * @param direction the edge direction (in-, out-, all-)
+ * For each vertex, the neighborsFunction can iterate over all neighbors of this vertex
+ * with the specified direction, and emit any number of output elements, including none.
+ *
+ * @param neighborsFunction the group reduce function to apply to the neighboring edges and vertices
+ * of each vertex.
+ * @param direction the edge direction (in-, out-, all-).
* @param <T> the output type
- * @return a dataset of a T
+ * @return a DataSet containing elements of type T
* @throws IllegalArgumentException
- */
+ */
public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
- EdgeDirection direction) throws IllegalArgumentException {
+ EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
// create <edge-sourceVertex> pairs
@@ -1558,19 +1647,23 @@ public class Graph<K, VV, EV> {
}
/**
- * Compute an aggregate over the neighbors (edges and vertices) of each
- * vertex. The function applied on the neighbors has access to the vertex
- * value.
- *
- * @param neighborsFunction the function to apply to the neighborhood
- * @param direction the edge direction (in-, out-, all-)
+ * Groups by vertex and computes a GroupReduce transformation over the neighbors (both edges and vertices)
+ * of each vertex. The neighborsFunction applied on the neighbors only has access to both the vertex id
+ * and the vertex value of the grouping vertex.
+ *
+ * For each vertex, the neighborsFunction can iterate over all neighbors of this vertex
+ * with the specified direction, and emit any number of output elements, including none.
+ *
+ * @param neighborsFunction the group reduce function to apply to the neighboring edges and vertices
+ * of each vertex.
+ * @param direction the edge direction (in-, out-, all-).
* @param <T> the output type
- * @param typeInfo the explicit return type.
- * @return a dataset of a T
+ * @param typeInfo the explicit return type
+ * @return a DataSet containing elements of type T
* @throws IllegalArgumentException
- */
+ */
public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
- EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+ EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
switch (direction) {
case IN:
// create <edge-sourceVertex> pairs
@@ -1601,20 +1694,23 @@ public class Graph<K, VV, EV> {
}
}
-
/**
- * Compute an aggregate over the neighbors (edges and vertices) of each
- * vertex. The function applied on the neighbors only has access to the
- * vertex id (not the vertex value).
+ * Groups by vertex and computes a GroupReduce transformation over the neighbors (both edges and vertices)
+ * of each vertex. The neighborsFunction applied on the neighbors only has access to the vertex id
+ * (not the vertex value) of the grouping vertex.
*
- * @param neighborsFunction the function to apply to the neighborhood
- * @param direction the edge direction (in-, out-, all-)
+ * For each vertex, the neighborsFunction can iterate over all neighbors of this vertex
+ * with the specified direction, and emit any number of output elements, including none.
+ *
+ * @param neighborsFunction the group reduce function to apply to the neighboring edges and vertices
+ * of each vertex.
+ * @param direction the edge direction (in-, out-, all-).
* @param <T> the output type
- * @return a dataset of a T
+ * @return a DataSet containing elements of type T
* @throws IllegalArgumentException
- */
+ */
public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
- EdgeDirection direction) throws IllegalArgumentException {
+ EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
// create <edge-sourceVertex> pairs
@@ -1647,19 +1743,23 @@ public class Graph<K, VV, EV> {
}
/**
- * Compute an aggregate over the neighbors (edges and vertices) of each
- * vertex. The function applied on the neighbors only has access to the
- * vertex id (not the vertex value).
- *
- * @param neighborsFunction the function to apply to the neighborhood
- * @param direction the edge direction (in-, out-, all-)
+ * Groups by vertex and computes a GroupReduce transformation over the neighbors (both edges and vertices)
+ * of each vertex. The neighborsFunction applied on the neighbors only has access to the vertex id
+ * (not the vertex value) of the grouping vertex.
+ *
+ * For each vertex, the neighborsFunction can iterate over all neighbors of this vertex
+ * with the specified direction, and emit any number of output elements, including none.
+ *
+ * @param neighborsFunction the group reduce function to apply to the neighboring edges and vertices
+ * of each vertex.
+ * @param direction the edge direction (in-, out-, all-).
* @param <T> the output type
- * @param typeInfo the explicit return type.
- * @return a dataset of a T
+ * @param typeInfo the explicit return type
+ * @return a DataSet containing elements of type T
* @throws IllegalArgumentException
- */
+ */
public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
- EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+ EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
switch (direction) {
case IN:
// create <edge-sourceVertex> pairs
@@ -1842,16 +1942,21 @@ public class Graph<K, VV, EV> {
}
/**
- * Compute an aggregate over the neighbor values of each
- * vertex.
+ * Compute a reduce transformation over the neighbors' vertex values of each vertex.
+ * For each vertex, the transformation consecutively calls a
+ * {@link ReduceNeighborsFunction} until only a single value for each vertex remains.
+ * The {@link ReduceNeighborsFunction} combines a pair of neighbor vertex values
+ * into one new value of the same type.
*
- * @param reduceNeighborsFunction the function to apply to the neighborhood
+ * @param reduceNeighborsFunction the reduce function to apply to the neighbors of each vertex.
* @param direction the edge direction (in-, out-, all-)
- * @return a Dataset containing one value per vertex (vertex id, aggregate vertex value)
+ * @return a Dataset of Tuple2, with one tuple per vertex.
+ * The first field of the Tuple2 is the vertex ID and the second field
+ * is the aggregate value computed by the provided {@link ReduceNeighborsFunction}.
* @throws IllegalArgumentException
- */
+ */
public DataSet<Tuple2<K, VV>> reduceOnNeighbors(ReduceNeighborsFunction<VV> reduceNeighborsFunction,
- EdgeDirection direction) throws IllegalArgumentException {
+ EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
// create <vertex-source value> pairs
@@ -1900,17 +2005,20 @@ public class Graph<K, VV, EV> {
}
/**
- * Compute an aggregate over the edge values of each vertex.
+ * Compute a reduce transformation over the edge values of each vertex.
+ * For each vertex, the transformation consecutively calls a
+ * {@link ReduceEdgesFunction} until only a single value for each edge remains.
+ * The {@link ReduceEdgesFunction} combines two edge values into one new value of the same type.
*
- * @param reduceEdgesFunction
- * the function to apply to the neighborhood
- * @param direction
- * the edge direction (in-, out-, all-)
- * @return a Dataset containing one value per vertex(vertex key, aggregate edge value)
+ * @param reduceEdgesFunction the reduce function to apply to the neighbors of each vertex.
+ * @param direction the edge direction (in-, out-, all-)
+ * @return a Dataset of Tuple2, with one tuple per vertex.
+ * The first field of the Tuple2 is the vertex ID and the second field
+ * is the aggregate value computed by the provided {@link ReduceEdgesFunction}.
* @throws IllegalArgumentException
- */
+ */
public DataSet<Tuple2<K, EV>> reduceOnEdges(ReduceEdgesFunction<EV> reduceEdgesFunction,
- EdgeDirection direction) throws IllegalArgumentException {
+ EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
index a21b23d..1a32204 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
@@ -36,5 +36,24 @@ import org.apache.flink.util.Collector;
*/
public interface NeighborsFunction<K, VV, EV, O> extends Function, Serializable {
+ /**
+ * This method is called per vertex and can iterate over all of its neighbors
+ * with the specified direction.
+ * <p>
+ * If called with {@link EdgeDirection#OUT} the group will contain
+ * the out-edges and neighboring vertices of the grouping vertex.
+ * If called with {@link EdgeDirection#IN} the group will contain
+ * the in-edges and neighboring vertices of the grouping vertex.
+ * If called with {@link EdgeDirection#ALL} the group will contain
+ * all edges and neighboring vertices of the grouping vertex.
+ * <p>
+ * The method can emit any number of output elements, including none.
+ *
+ * @param neighbors the neighbors of the grouping vertex.
+ * The first filed of each Tuple3 is the ID of the grouping vertex.
+ * The second field is the neighboring edge, and the third field is the neighboring vertex.
+ * @param out the collector to emit results to
+ * @throws Exception
+ */
void iterateNeighbors(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<O> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
index fdf54fa..657238c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
@@ -36,5 +36,25 @@ import org.apache.flink.util.Collector;
*/
public interface NeighborsFunctionWithVertexValue<K, VV, EV, O> extends Function, Serializable {
+ /**
+ * This method is called per vertex and can iterate over all of its neighbors
+ * with the specified direction.
+ * <p>
+ * If called with {@link EdgeDirection#OUT} the group will contain
+ * the out-edges and neighboring vertices of the grouping vertex.
+ * If called with {@link EdgeDirection#IN} the group will contain
+ * the in-edges and neighboring vertices of the grouping vertex.
+ * If called with {@link EdgeDirection#ALL} the group will contain
+ * all edges and neighboring vertices of the grouping vertex.
+ * <p>
+ * The method can emit any number of output elements, including none.
+ *
+ * @param vertex the grouping Vertex
+ * @param neighbors the neighbors of the grouping vertex.
+ * The first filed of each Tuple3 is the ID of the grouping vertex.
+ * The second field is the neighboring edge, and the third field is the neighboring vertex.
+ * @param out the collector to emit results to
+ * @throws Exception
+ */
void iterateNeighbors(Vertex<K, VV> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<O> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
index 84eec51..e7631a1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
@@ -30,5 +30,14 @@ import java.io.Serializable;
*/
public interface ReduceEdgesFunction<EV> extends Function, Serializable {
+ /**
+ * It combines two neighboring edge values into one new value of the same type.
+ * For each vertex, this function is consecutively called,
+ * until only a single value for each edge remains.
+ *
+ * @param firstEdgeValue the first neighboring edge value to combine
+ * @param secondEdgeValue the second neighboring edge value to combine
+ * @return the combined value of both input values
+ */
EV reduceEdges(EV firstEdgeValue, EV secondEdgeValue);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
index fc5295d..5b423e2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
@@ -31,5 +31,14 @@ import java.io.Serializable;
*/
public interface ReduceNeighborsFunction <VV> extends Function, Serializable {
+ /**
+ * It combines two neighboring vertex values into one new value of the same type.
+ * For each vertex, this function is consecutively called,
+ * until only a single value for each vertex remains.
+ *
+ * @param firstNeighborValue the first neighboring vertex value to combine
+ * @param secondNeighborValue the second neighboring vertex value to combine
+ * @return the combined value of both input values
+ */
VV reduceNeighbors(VV firstNeighborValue, VV secondNeighborValue);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
new file mode 100644
index 0000000..a30d1a2
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+
+/**
+ * Interface to be implemented by the transformation function
+ * applied in {@link Graph#joinWithVertices(DataSet, VertexJoinFunction)} method.
+ *
+ * @param <VV> the vertex value type
+ * @param <T> the input value type
+ */
+public interface VertexJoinFunction<VV, T> extends Function, Serializable {
+
+ /**
+ * Applies a transformation on the current vertex value
+ * and the value of the matched tuple of the input DataSet.
+ *
+ * @param vertexValue the current vertex value
+ * @param inputValue the value of the matched Tuple2 input
+ * @return the new vertex value
+ */
+ VV vertexJoin(VV vertexValue, T inputValue) throws Exception;
+}