You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/09/29 00:39:03 UTC

[3/3] flink git commit: [FLINK-2561] [gelly] add missing methods to Graph: add-remove edges/vertices, difference, graph creation methods, validate, getTriplets. Add missing utility mappers.

[FLINK-2561] [gelly] add missing methods to Graph:
add-remove edges/vertices, difference, graph creation methods,
validate, getTriplets. Add missing utility mappers.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b4dc067
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b4dc067
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b4dc067

Branch: refs/heads/master
Commit: 0b4dc067fee82654fae3292bf6bf7d59157bf5c0
Parents: 0f17755
Author: vasia <va...@apache.org>
Authored: Thu Sep 24 22:08:10 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Tue Sep 29 00:38:20 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/graph/scala/Graph.scala    | 151 ++++++++++++++++++-
 .../graph/scala/utils/Tuple2ToVertexMap.scala   |  31 ++++
 .../graph/scala/utils/Tuple3ToEdgeMap.scala     |  31 ++++
 3 files changed, 212 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0b4dc067/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 73e175e..ed58ffd 100644
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -23,26 +23,108 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.{tuple => jtuple}
 import org.apache.flink.api.scala._
 import org.apache.flink.graph._
+import org.apache.flink.graph.validation.GraphValidator
 import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction}
 import org.apache.flink.graph.spargel.{MessagingFunction, VertexCentricConfiguration, VertexUpdateFunction}
 import org.apache.flink.{graph => jg}
-
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.reflect.ClassTag
+import org.apache.flink.types.NullValue
 
 object Graph {
+
+  /**
+  * Creates a Graph from a DataSet of vertices and a DataSet of edges.
+  */
   def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
   TypeInformation : ClassTag](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]],
                               env: ExecutionEnvironment): Graph[K, VV, EV] = {
     wrapGraph(jg.Graph.fromDataSet[K, VV, EV](vertices.javaSet, edges.javaSet, env.getJavaEnv))
   }
 
+  /**
+  * Creates a Graph from a DataSet of edges.
+  * Vertices are created automatically and their values are set to NullValue.
+  */
+  def fromDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
+  (edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
+    wrapGraph(jg.Graph.fromDataSet[K, EV](edges.javaSet, env.getJavaEnv))
+  }
+
+  /**
+  * Creates a graph from a DataSet of edges.
+  * Vertices are created automatically and their values are set by applying the provided
+  * map function to the vertex ids.
+  */
+  def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+  TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment,
+  mapper: MapFunction[K, VV]): Graph[K, VV, EV] = {
+    wrapGraph(jg.Graph.fromDataSet[K, VV, EV](edges.javaSet, mapper, env.getJavaEnv))
+  }
+
+  /**
+  * Creates a Graph from a Seq of vertices and a Seq of edges.
+  */
   def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
   TypeInformation : ClassTag](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]], env:
   ExecutionEnvironment): Graph[K, VV, EV] = {
     wrapGraph(jg.Graph.fromCollection[K, VV, EV](vertices.asJavaCollection, edges
       .asJavaCollection, env.getJavaEnv))
   }
+
+  /**
+  * Creates a Graph from a Seq of edges.
+  * Vertices are created automatically and their values are set to NullValue.
+  */
+  def fromCollection[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
+  (edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
+    wrapGraph(jg.Graph.fromCollection[K, EV](edges.asJavaCollection, env.getJavaEnv))
+  }
+
+  /**
+  * Creates a graph from a Seq of edges.
+  * Vertices are created automatically and their values are set by applying the provided
+  * map function to the vertex ids.
+  */
+  def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+  TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], env: ExecutionEnvironment,
+  mapper: MapFunction[K, VV]): Graph[K, VV, EV] = {
+    wrapGraph(jg.Graph.fromCollection[K, VV, EV](edges.asJavaCollection, mapper, env.getJavaEnv))
+  }
+
+  /**
+  * Creates a Graph from a DataSets of Tuples.
+  */
+  def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+  TypeInformation : ClassTag](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)],
+                              env: ExecutionEnvironment): Graph[K, VV, EV] = {
+    val javaTupleVertices = vertices.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
+    val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
+    wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges, env.getJavaEnv))
+  }
+
+  /**
+  * Creates a Graph from a DataSet of Tuples representing the edges.
+  * Vertices are created automatically and their values are set to NullValue.
+  */
+  def fromTupleDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
+  (edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
+    val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
+    wrapGraph(jg.Graph.fromTupleDataSet[K, EV](javaTupleEdges, env.getJavaEnv))
+  }
+
+  /**
+  * Creates a Graph from a DataSet of Tuples representing the edges.
+  * Vertices are created automatically and their values are set by applying the provided
+  * map function to the vertex ids.
+  */
+  def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+  TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], env: ExecutionEnvironment,
+  mapper: MapFunction[K, VV]): Graph[K, VV, EV] = {
+    val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
+    wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, mapper, env.getJavaEnv))
+  }
+
 }
 
 /**
@@ -93,6 +175,14 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
+  * @return a DataSet of Triplets,
+  * consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue)
+  */
+  def getTriplets(): DataSet[Triplet[K, VV, EV]] = {
+    wrap(jgraph.getTriplets())
+  }
+
+  /**
    * Apply a function to the attribute of each vertex in the graph.
    *
    * @param mapper the map function to apply.
@@ -575,6 +665,29 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
+  * Adds the list of vertices, passed as input, to the graph.
+  * If the vertices already exist in the graph, they will not be added once more.
+  *
+  * @param verticesToAdd the list of vertices to add
+  * @return the new graph containing the existing and newly added vertices
+  */
+  def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = {
+    wrapGraph(jgraph.addVertices(vertices.asJava))
+  }
+
+  /**
+  * Adds the given list edges to the graph.
+  *
+  * When adding an edge for a non-existing set of vertices, the edge is considered invalid and ignored.
+  *
+  * @param newEdges the data set of edges to be added
+  * @return a new graph containing the existing edges plus the newly added edges.
+  */
+  def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = {
+    wrapGraph(jgraph.addEdges(edges.asJava))
+  }
+
+    /**
    * Adds the given edge to the graph. If the source and target vertices do
    * not exist in the graph, they will also be added.
    *
@@ -599,6 +712,17 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
     wrapGraph(jgraph.removeVertex(vertex))
   }
 
+    /**
+   * Removes the given vertex and its edges from the graph.
+   *
+   * @param vertex the vertex to remove
+   * @return the new graph containing the existing vertices and edges without
+   *         the removed vertex and its edges
+   */
+  def removeVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = {
+    wrapGraph(jgraph.removeVertices(vertices.asJava))
+  }
+
   /**
    * Removes all edges that match the given edge from the graph.
    *
@@ -611,6 +735,16 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
+   * Removes all the edges that match the edges in the given data set from the graph.
+   *
+   * @param edgesToBeRemoved the list of edges to be removed
+   * @return a new graph where the edges have been removed and in which the vertices remained intact
+   */
+  def removeEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = {
+    wrapGraph(jgraph.removeEdges(edges.asJava))
+  }
+
+  /**
    * Performs union on the vertices and edges sets of the input graphs
    * removing duplicate vertices but maintaining duplicate edges.
    *
@@ -622,6 +756,16 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
+  * Performs Difference on the vertex and edge sets of the input graphs
+  * removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed
+  * @param graph the graph to perform difference with
+  * @return a new graph where the common vertices and edges have been removed
+  */
+  def difference(graph: Graph[K, VV, EV]) = {
+    wrapGraph(jgraph.difference(graph.getWrappedGraph))
+  }
+
+  /**
    * Compute an aggregate over the neighbor values of each
    * vertex.
    *
@@ -732,4 +876,9 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
     wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction,
       maxIterations, parameters))
   }
+
+  def validate(validator: GraphValidator[K, VV, EV]): Boolean = {
+    jgraph.validate(validator)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b4dc067/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
new file mode 100644
index 0000000..f2b1133
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.Vertex
+
+class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]] {
+
+  private val serialVersionUID: Long = 1L
+
+  override def map(value: (K, VV)): Vertex[K, VV] = {
+    new Vertex(value._1, value._2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b4dc067/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
new file mode 100644
index 0000000..00cb074
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.Edge
+
+class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]] {
+
+  private val serialVersionUID: Long = 1L
+
+  override def map(value: (K, K, EV)): Edge[K, EV] = {
+    new Edge(value._1, value._2, value._3)
+  }
+}