You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/09/29 00:39:03 UTC
[3/3] flink git commit: [FLINK-2561] [gelly] add missing methods to
Graph: add-remove edges/vertices, difference, graph creation methods, validate,
getTriplets. Add missing utility mappers.
[FLINK-2561] [gelly] add missing methods to Graph:
add-remove edges/vertices, difference, graph creation methods,
validate, getTriplets. Add missing utility mappers.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b4dc067
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b4dc067
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b4dc067
Branch: refs/heads/master
Commit: 0b4dc067fee82654fae3292bf6bf7d59157bf5c0
Parents: 0f17755
Author: vasia <va...@apache.org>
Authored: Thu Sep 24 22:08:10 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Tue Sep 29 00:38:20 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/graph/scala/Graph.scala | 151 ++++++++++++++++++-
.../graph/scala/utils/Tuple2ToVertexMap.scala | 31 ++++
.../graph/scala/utils/Tuple3ToEdgeMap.scala | 31 ++++
3 files changed, 212 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0b4dc067/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 73e175e..ed58ffd 100644
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -23,26 +23,108 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.{tuple => jtuple}
import org.apache.flink.api.scala._
import org.apache.flink.graph._
+import org.apache.flink.graph.validation.GraphValidator
import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction}
import org.apache.flink.graph.spargel.{MessagingFunction, VertexCentricConfiguration, VertexUpdateFunction}
import org.apache.flink.{graph => jg}
-
import _root_.scala.collection.JavaConverters._
import _root_.scala.reflect.ClassTag
+import org.apache.flink.types.NullValue
object Graph {
+
+ /**
+ * Creates a Graph from a DataSet of vertices and a DataSet of edges.
+ */
def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
TypeInformation : ClassTag](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]],
env: ExecutionEnvironment): Graph[K, VV, EV] = {
wrapGraph(jg.Graph.fromDataSet[K, VV, EV](vertices.javaSet, edges.javaSet, env.getJavaEnv))
}
+ /**
+ * Creates a Graph from a DataSet of edges.
+ * Vertices are created automatically and their values are set to NullValue.
+ */
+ def fromDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
+ (edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
+ wrapGraph(jg.Graph.fromDataSet[K, EV](edges.javaSet, env.getJavaEnv))
+ }
+
+ /**
+ * Creates a graph from a DataSet of edges.
+ * Vertices are created automatically and their values are set by applying the provided
+ * map function to the vertex ids.
+ */
+ def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+ TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment,
+ mapper: MapFunction[K, VV]): Graph[K, VV, EV] = {
+ wrapGraph(jg.Graph.fromDataSet[K, VV, EV](edges.javaSet, mapper, env.getJavaEnv))
+ }
+
+ /**
+ * Creates a Graph from a Seq of vertices and a Seq of edges.
+ */
def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
TypeInformation : ClassTag](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]], env:
ExecutionEnvironment): Graph[K, VV, EV] = {
wrapGraph(jg.Graph.fromCollection[K, VV, EV](vertices.asJavaCollection, edges
.asJavaCollection, env.getJavaEnv))
}
+
+ /**
+ * Creates a Graph from a Seq of edges.
+ * Vertices are created automatically and their values are set to NullValue.
+ */
+ def fromCollection[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
+ (edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
+ wrapGraph(jg.Graph.fromCollection[K, EV](edges.asJavaCollection, env.getJavaEnv))
+ }
+
+ /**
+ * Creates a graph from a Seq of edges.
+ * Vertices are created automatically and their values are set by applying the provided
+ * map function to the vertex ids.
+ */
+ def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+ TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], env: ExecutionEnvironment,
+ mapper: MapFunction[K, VV]): Graph[K, VV, EV] = {
+ wrapGraph(jg.Graph.fromCollection[K, VV, EV](edges.asJavaCollection, mapper, env.getJavaEnv))
+ }
+
+ /**
+ * Creates a Graph from a DataSets of Tuples.
+ */
+ def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+ TypeInformation : ClassTag](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)],
+ env: ExecutionEnvironment): Graph[K, VV, EV] = {
+ val javaTupleVertices = vertices.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
+ val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
+ wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges, env.getJavaEnv))
+ }
+
+ /**
+ * Creates a Graph from a DataSet of Tuples representing the edges.
+ * Vertices are created automatically and their values are set to NullValue.
+ */
+ def fromTupleDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
+ (edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
+ val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
+ wrapGraph(jg.Graph.fromTupleDataSet[K, EV](javaTupleEdges, env.getJavaEnv))
+ }
+
+ /**
+ * Creates a Graph from a DataSet of Tuples representing the edges.
+ * Vertices are created automatically and their values are set by applying the provided
+ * map function to the vertex ids.
+ */
+ def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+ TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], env: ExecutionEnvironment,
+ mapper: MapFunction[K, VV]): Graph[K, VV, EV] = {
+ val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
+ wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, mapper, env.getJavaEnv))
+ }
+
}
/**
@@ -93,6 +175,14 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
}
/**
+ * @return a DataSet of Triplets,
+ * consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue)
+ */
+ def getTriplets(): DataSet[Triplet[K, VV, EV]] = {
+ wrap(jgraph.getTriplets())
+ }
+
+ /**
* Apply a function to the attribute of each vertex in the graph.
*
* @param mapper the map function to apply.
@@ -575,6 +665,29 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
}
/**
+ * Adds the list of vertices, passed as input, to the graph.
+ * If the vertices already exist in the graph, they will not be added once more.
+ *
+ * @param verticesToAdd the list of vertices to add
+ * @return the new graph containing the existing and newly added vertices
+ */
+ def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = {
+ wrapGraph(jgraph.addVertices(vertices.asJava))
+ }
+
+ /**
+ * Adds the given list edges to the graph.
+ *
+ * When adding an edge for a non-existing set of vertices, the edge is considered invalid and ignored.
+ *
+ * @param newEdges the data set of edges to be added
+ * @return a new graph containing the existing edges plus the newly added edges.
+ */
+ def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = {
+ wrapGraph(jgraph.addEdges(edges.asJava))
+ }
+
+ /**
* Adds the given edge to the graph. If the source and target vertices do
* not exist in the graph, they will also be added.
*
@@ -599,6 +712,17 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
wrapGraph(jgraph.removeVertex(vertex))
}
+ /**
+ * Removes the given vertex and its edges from the graph.
+ *
+ * @param vertex the vertex to remove
+ * @return the new graph containing the existing vertices and edges without
+ * the removed vertex and its edges
+ */
+ def removeVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = {
+ wrapGraph(jgraph.removeVertices(vertices.asJava))
+ }
+
/**
* Removes all edges that match the given edge from the graph.
*
@@ -611,6 +735,16 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
}
/**
+ * Removes all the edges that match the edges in the given data set from the graph.
+ *
+ * @param edgesToBeRemoved the list of edges to be removed
+ * @return a new graph where the edges have been removed and in which the vertices remained intact
+ */
+ def removeEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = {
+ wrapGraph(jgraph.removeEdges(edges.asJava))
+ }
+
+ /**
* Performs union on the vertices and edges sets of the input graphs
* removing duplicate vertices but maintaining duplicate edges.
*
@@ -622,6 +756,16 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
}
/**
+ * Performs Difference on the vertex and edge sets of the input graphs
+ * removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed
+ * @param graph the graph to perform difference with
+ * @return a new graph where the common vertices and edges have been removed
+ */
+ def difference(graph: Graph[K, VV, EV]) = {
+ wrapGraph(jgraph.difference(graph.getWrappedGraph))
+ }
+
+ /**
* Compute an aggregate over the neighbor values of each
* vertex.
*
@@ -732,4 +876,9 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction,
maxIterations, parameters))
}
+
+ def validate(validator: GraphValidator[K, VV, EV]): Boolean = {
+ jgraph.validate(validator)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b4dc067/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
new file mode 100644
index 0000000..f2b1133
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.Vertex
+
+class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]] {
+
+ private val serialVersionUID: Long = 1L
+
+ override def map(value: (K, VV)): Vertex[K, VV] = {
+ new Vertex(value._1, value._2)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b4dc067/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
new file mode 100644
index 0000000..00cb074
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.Edge
+
+class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]] {
+
+ private val serialVersionUID: Long = 1L
+
+ override def map(value: (K, K, EV)): Edge[K, EV] = {
+ new Edge(value._1, value._2, value._3)
+ }
+}