You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/09/29 00:39:01 UTC
[1/3] flink git commit: [FLINK-2561] [gelly] add GraphMetrics Scala
example
Repository: flink
Updated Branches:
refs/heads/master 0f1775576 -> 233dab497
[FLINK-2561] [gelly] add GraphMetrics Scala example
This closes #1183
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/233dab49
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/233dab49
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/233dab49
Branch: refs/heads/master
Commit: 233dab497577f0a9443f772bb10390f6dcc005f1
Parents: 9e0284e
Author: vasia <va...@apache.org>
Authored: Fri Sep 25 11:20:15 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Tue Sep 29 00:38:20 2015 +0200
----------------------------------------------------------------------
.../graph/scala/example/GraphMetrics.scala | 129 +++++++++++++++++++
.../graph/scala/utils/EdgeToTuple3Map.scala | 3 +-
.../graph/scala/utils/Tuple2ToVertexMap.scala | 3 +-
.../graph/scala/utils/Tuple3ToEdgeMap.scala | 3 +-
.../graph/scala/utils/VertexToTuple2Map.scala | 3 +-
.../scala/test/operations/DegreesITCase.scala | 6 +-
.../test/operations/GraphMutationsITCase.scala | 12 +-
.../test/operations/JoinWithEdgesITCase.scala | 12 +-
.../operations/JoinWithVerticesITCase.scala | 4 +-
.../scala/test/operations/MapEdgesITCase.scala | 4 +-
.../test/operations/MapVerticesITCase.scala | 4 +-
.../operations/ReduceOnEdgesMethodsITCase.scala | 10 +-
.../ReduceOnNeighborMethodsITCase.scala | 8 +-
.../flink/graph/example/GraphMetrics.java | 2 +-
14 files changed, 164 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
new file mode 100644
index 0000000..68d9285
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.graph.scala.example
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.Edge
+import org.apache.flink.util.Collector
+
+/**
+ * This example illustrates how to use Gelly metrics methods and get simple statistics
+ * from the input graph.
+ *
+ * The program creates a random graph and computes and prints
+ * the following metrics:
+ * - number of vertices
+ * - number of edges
+ * - average node degree
+ * - the vertex ids with the max/min in- and out-degrees
+ *
+ * The input file is expected to contain one edge per line,
+ * with long IDs and no values, in the following format:
+ * {{{
+ * <sourceVertexID>\t<targetVertexID>
+ * }}}
+ * If no arguments are provided, the example runs with a random graph of 100 vertices.
+ *
+ */
+object GraphMetrics {
+ def main(args: Array[String]) {
+ if (!parseParameters(args)) {
+ return
+ }
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ /** create the graph **/
+ val graph: Graph[Long, NullValue, NullValue] = Graph.fromDataSet(getEdgeDataSet(env), env)
+
+ /** get the number of vertices **/
+ val numVertices = graph.numberOfVertices;
+
+ /** get the number of edges **/
+ val numEdges = graph.numberOfEdges;
+
+ /** compute the average node degree **/
+ val verticesWithDegrees = graph.getDegrees;
+ val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble)
+
+ /** find the vertex with the maximum in-degree **/
+ val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1)
+
+ /** find the vertex with the minimum in-degree **/
+ val minInDegreeVertex = graph.inDegrees.min(1).map(in => in._1)
+
+ /** find the vertex with the maximum out-degree **/
+ val maxOutDegreeVertex = graph.outDegrees.max(1).map(in => in._1)
+
+ /** find the vertex with the minimum out-degree **/
+ val minOutDegreeVertex = graph.outDegrees.min(1).map(in => in._1)
+
+ /** print the results **/
+ env.fromElements(numVertices).printOnTaskManager("Total number of vertices")
+ env.fromElements(numEdges).printOnTaskManager("Total number of edges")
+ avgDegree.printOnTaskManager("Average node degree")
+ maxInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
+ minInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
+ maxOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
+ minOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
+
+ }
+
+ private def parseParameters(args: Array[String]): Boolean = {
+ if (args.length > 0) {
+ fileOutput = true
+ if (args.length == 1) {
+ edgesPath = args(0)
+ true
+ } else {
+ System.err.println("Usage: GraphMetrics <edges path>")
+ false
+ }
+ } else {
+ System.out.println("Executing GraphMetrics example with built-in default data.")
+ System.out.println(" Provide parameters to read input data from a file.")
+ System.out.println(" Usage: GraphMetrics <edges path>")
+ true
+ }
+ }
+
+ private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
+ if (fileOutput) {
+ env.readCsvFile[(Long, Long)](
+ edgesPath,
+ fieldDelimiter = "\t").map(
+ in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance()))
+ }
+ else {
+ env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]](
+ (key: Long, out: Collector[Edge[Long, NullValue]]) => {
+ val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
+ for ( i <- 0 to numOutEdges ) {
+ var target: Long = ((Math.random() * numVertices) + 1).toLong
+ new Edge[Long, NullValue](key, target, NullValue.getInstance())
+ }
+ })
+ }
+ }
+
+ private var fileOutput: Boolean = false
+ private var edgesPath: String = null
+ private var outputPath: String = null
+ private val numVertices = 100
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
index 0d7d2af..909dbb4 100644
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
@@ -21,10 +21,9 @@ package org.apache.flink.graph.scala.utils
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.graph.Edge
+@SerialVersionUID(1L)
class EdgeToTuple3Map[K, EV] extends MapFunction[Edge[K, EV], (K, K, EV)] {
- private val serialVersionUID: Long = 1L
-
override def map(value: Edge[K, EV]): (K, K, EV) = {
(value.getSource, value.getTarget, value.getValue)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
index f2b1133..fd6b8c5 100644
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
@@ -21,10 +21,9 @@ package org.apache.flink.graph.scala.utils
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.graph.Vertex
+@SerialVersionUID(1L)
class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]] {
- private val serialVersionUID: Long = 1L
-
override def map(value: (K, VV)): Vertex[K, VV] = {
new Vertex(value._1, value._2)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
index 00cb074..d0e07cc 100644
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
@@ -21,10 +21,9 @@ package org.apache.flink.graph.scala.utils
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.graph.Edge
+@SerialVersionUID(1L)
class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]] {
- private val serialVersionUID: Long = 1L
-
override def map(value: (K, K, EV)): Edge[K, EV] = {
new Edge(value._1, value._2, value._3)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
index de77832..faf4e10 100644
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
@@ -21,10 +21,9 @@ package org.apache.flink.graph.scala.utils
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.graph.Vertex
+@SerialVersionUID(1L)
class VertexToTuple2Map[K, VV] extends MapFunction[Vertex[K, VV], (K, VV)] {
- private val serialVersionUID: Long = 1L
-
override def map(value: Vertex[K, VV]): (K, VV) = {
(value.getId, value.getValue)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
index 6196f99..b347049 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
@@ -40,7 +40,7 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.inDegrees().collect.toList
+ val res = graph.inDegrees.collect().toList
expectedResult = "(1,1)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,2)\n"
TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@@ -51,7 +51,7 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.outDegrees().collect.toList
+ val res = graph.outDegrees.collect().toList
expectedResult = "(1,2)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,1)\n"
TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@@ -62,7 +62,7 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.getDegrees().collect.toList
+ val res = graph.getDegrees.collect().toList
expectedResult = "(1,3)\n" + "(2,2)\n" + "(3,4)\n" + "(4,2)\n" + "(5,3)\n"
TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
index 3cb92c4..4b776e2 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
@@ -119,7 +119,7 @@ MultipleProgramsTestBase(mode) {
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.removeVertex(new Vertex[Long, Long](6L, 6L))
- val res = newgraph.getEdges.collect.toList
+ val res = newgraph.getEdges.collect().toList
expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
"45\n" + "5,1,51\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -146,7 +146,7 @@ MultipleProgramsTestBase(mode) {
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L),
new Vertex[Long, Long](6L, 6L)))
- val res = newgraph.getEdges.collect.toList
+ val res = newgraph.getEdges.collect().toList
expectedResult = "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@@ -159,7 +159,7 @@ MultipleProgramsTestBase(mode) {
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.addEdge(new Vertex[Long, Long](6L, 6L), new Vertex[Long, Long](1L,
1L), 61L)
- val res = newgraph.getEdges.collect.toList
+ val res = newgraph.getEdges.collect().toList
expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
"45\n" + "5,1,51\n" + "6,1,61\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -201,7 +201,7 @@ MultipleProgramsTestBase(mode) {
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.addEdge(new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](2L,
2L), 12L)
- val res = newgraph.getEdges.collect.toList
+ val res = newgraph.getEdges.collect().toList
expectedResult = "1,2,12\n" + "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," +
"35\n" + "4,5,45\n" + "5,1,51\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -214,7 +214,7 @@ MultipleProgramsTestBase(mode) {
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.removeEdge(new Edge[Long, Long](5L, 1L, 51L))
- val res = newgraph.getEdges.collect.toList
+ val res = newgraph.getEdges.collect().toList
expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@@ -226,7 +226,7 @@ MultipleProgramsTestBase(mode) {
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.removeEdge(new Edge[Long, Long](6L, 1L, 61L))
- val res = newgraph.getEdges.collect.toList
+ val res = newgraph.getEdges.collect().toList
expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
"45\n" + "5,1,51\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
index eae8bd5..3dc90fc 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
@@ -45,7 +45,7 @@ MultipleProgramsTestBase(mode) {
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new
EdgeToTuple3Map[Long, Long]), new AddValuesMapper)
- val res = result.getEdges.collect.toList
+ val res = result.getEdges.collect().toList
expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
"90\n" + "5,1,102\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -60,7 +60,7 @@ MultipleProgramsTestBase(mode) {
val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new
EdgeToTuple3Map[Long, Long]), (originalValue: Long, tupleValue: Long) =>
originalValue + tupleValue)
- val res = result.getEdges.collect.toList
+ val res = result.getEdges.collect().toList
expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
"90\n" + "5,1,102\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -75,7 +75,7 @@ MultipleProgramsTestBase(mode) {
val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges
.map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) =>
originalValue + tupleValue)
- val res = result.getEdges.collect.toList
+ val res = result.getEdges.collect().toList
expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," +
"90\n" + "5,1,102\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -90,7 +90,7 @@ MultipleProgramsTestBase(mode) {
val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges
.map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) =>
originalValue + tupleValue)
- val res = result.getEdges.collect.toList
+ val res = result.getEdges.collect().toList
expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," +
"90\n" + "5,1,102\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -105,7 +105,7 @@ MultipleProgramsTestBase(mode) {
val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges
.map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) =>
originalValue + tupleValue)
- val res = result.getEdges.collect.toList
+ val res = result.getEdges.collect().toList
expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
"80\n" + "5,1,102\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -120,7 +120,7 @@ MultipleProgramsTestBase(mode) {
val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges
.map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) =>
originalValue + tupleValue)
- val res = result.getEdges.collect.toList
+ val res = result.getEdges.collect().toList
expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
"80\n" + "5,1,102\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
index 8d18d58..98ee8b6 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
@@ -44,7 +44,7 @@ MultipleProgramsTestBase(mode) {
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result: Graph[Long, Long, Long] = graph.joinWithVertices(graph.getVertices.map(new
VertexToTuple2Map[Long, Long]), new AddValuesMapper)
- val res = result.getVertices.collect.toList
+ val res = result.getVertices.collect().toList
expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@@ -58,7 +58,7 @@ MultipleProgramsTestBase(mode) {
val tupleSet = graph.getVertices.map(new VertexToTuple2Map[Long, Long])
val result: Graph[Long, Long, Long] = graph.joinWithVertices[Long](tupleSet,
(originalvalue: Long, tuplevalue: Long) => originalvalue + tuplevalue)
- val res = result.getVertices.collect.toList
+ val res = result.getVertices.collect().toList
expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
index 0fa8d2b..bdfd569 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
@@ -42,7 +42,7 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.mapEdges(new AddOneMapper).getEdges.collect.toList
+ val res = graph.mapEdges(new AddOneMapper).getEdges.collect().toList
expectedResult = "1,2,13\n" +
"1,3,14\n" + "" +
"2,3,24\n" +
@@ -60,7 +60,7 @@ MultipleProgramsTestBase(mode) {
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val res = graph.mapEdges(edge => edge.getValue + 1)
- .getEdges.collect.toList
+ .getEdges.collect().toList
expectedResult = "1,2,13\n" +
"1,3,14\n" + "" +
"2,3,24\n" +
http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
index c1ab3ea..2e51d90 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
@@ -42,7 +42,7 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.mapVertices(new AddOneMapper).getVertices.collect.toList
+ val res = graph.mapVertices(new AddOneMapper).getVertices.collect().toList
expectedResult = "1,2\n" +
"2,3\n" +
"3,4\n" +
@@ -57,7 +57,7 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.mapVertices(vertex => vertex.getValue + 1).getVertices.collect.toList
+ val res = graph.mapVertices(vertex => vertex.getValue + 1).getVertices.collect().toList
expectedResult = "1,2\n" +
"2,3\n" +
"3,4\n" +
http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
index 695f74a..dcd1deb 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
@@ -43,7 +43,7 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val res = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour,
- EdgeDirection.ALL).collect.toList
+ EdgeDirection.ALL).collect().toList
expectedResult = "(5,1)\n" + "(5,3)\n" + "(5,4)"
TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@@ -56,7 +56,7 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val res = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL)
- .collect.toList
+ .collect().toList
expectedResult = "(1,2)\n" + "(1,3)\n" + "(1,5)\n" + "(2,1)\n" + "(2,3)\n" +
"(3,1)\n" + "(3,2)\n" + "(3,4)\n" + "(3,5)\n" + "(4,3)\n" + "(4,5)\n" +
"(5,1)\n" + "(5,3)\n" + "(5,4)"
@@ -71,7 +71,7 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new
SelectMinWeightNeighborNoValue, EdgeDirection.OUT)
- val res = verticesWithLowestOutNeighbor.collect.toList
+ val res = verticesWithLowestOutNeighbor.collect().toList
expectedResult = "(1,12)\n" + "(2,23)\n" + "(3,34)\n" + "(4,45)\n" + "(5,51)\n"
TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@@ -84,7 +84,7 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new
SelectMinWeightNeighborNoValue, EdgeDirection.IN)
- val res = verticesWithLowestOutNeighbor.collect.toList
+ val res = verticesWithLowestOutNeighbor.collect().toList
expectedResult = "(1,51)\n" + "(2,12)\n" + "(3,13)\n" + "(4,34)\n" + "(5,35)\n"
TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@@ -97,7 +97,7 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val verticesWithMaxEdgeWeight: DataSet[(Long, Long)] = graph.reduceOnEdges(new
SelectMaxWeightNeighborNoValue, EdgeDirection.ALL)
- val res = verticesWithMaxEdgeWeight.collect.toList
+ val res = verticesWithMaxEdgeWeight.collect().toList
expectedResult = "(1,51)\n" + "(2,23)\n" + "(3,35)\n" + "(4,45)\n" + "(5,51)\n"
TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
index b01e750..aef5493 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
@@ -43,7 +43,7 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL)
- .collect.toList
+ .collect().toList
expectedResult = "(1,10)\n" + "(2,4)\n" + "(3,12)\n" + "(4,8)\n" + "(5,8)\n"
TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@@ -54,7 +54,7 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).collect.toList
+ val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).collect().toList
expectedResult = "(1,5)\n" + "(2,3)\n" + "(3,9)\n" + "(4,5)\n" + "(5,1)\n"
TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@@ -66,7 +66,7 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result = graph.groupReduceOnNeighbors(new SumAllNeighbors, EdgeDirection.ALL)
- val res = result.collect.toList
+ val res = result.collect().toList
expectedResult = "(1,11)\n" + "(2,6)\n" + "(3,15)\n" + "(4,12)\n" + "(5,13)\n"
TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@@ -79,7 +79,7 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result = graph.groupReduceOnNeighbors(new
SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo, EdgeDirection.IN)
- val res = result.collect.toList
+ val res = result.collect().toList
expectedResult = "(3,59)\n" + "(3,118)\n" + "(4,204)\n" + "(4,102)\n" + "(5,570)\n" + "(5,285)"
TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/233dab49/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
index 591ed26..b808e76 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
@@ -30,7 +30,7 @@ import org.apache.flink.graph.example.utils.ExampleUtils;
import org.apache.flink.types.NullValue;
/**
- * This example illustrate how to use Gelly metrics methods and get simple statistics
+ * This example illustrates how to use Gelly metrics methods and get simple statistics
* from the input graph.
*
* The program creates a random graph and computes and prints
[2/3] flink git commit: [FLINK-2561] [gelly] convert existing tests
to use collect instead of files; add tests for newly added operations.
Posted by va...@apache.org.
[FLINK-2561] [gelly] convert existing tests to use collect instead of files;
add tests for newly added operations.
Add completeness test: fromCsvReader method is missing.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e0284ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e0284ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e0284ef
Branch: refs/heads/master
Commit: 9e0284efa90561c88b1bc829b800d89e84477caa
Parents: 0b4dc06
Author: vasia <va...@apache.org>
Authored: Thu Sep 24 23:31:38 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Tue Sep 29 00:38:20 2015 +0200
----------------------------------------------------------------------
flink-staging/flink-gelly-scala/pom.xml | 8 +-
.../org/apache/flink/graph/scala/Graph.scala | 9 +-
.../test/GellyScalaAPICompletenessTest.scala | 45 +++++
.../scala/test/operations/DegreesITCase.scala | 39 ++---
.../test/operations/GraphMutationsITCase.scala | 165 ++++++++++++++-----
.../test/operations/GraphOperationsITCase.scala | 151 +++++++++++------
.../test/operations/JoinWithEdgesITCase.scala | 45 ++---
.../operations/JoinWithVerticesITCase.scala | 29 +---
.../scala/test/operations/MapEdgesITCase.scala | 33 +---
.../test/operations/MapVerticesITCase.scala | 33 +---
.../operations/ReduceOnEdgesMethodsITCase.scala | 63 +++----
.../ReduceOnNeighborMethodsITCase.scala | 46 ++----
12 files changed, 355 insertions(+), 311 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/pom.xml b/flink-staging/flink-gelly-scala/pom.xml
index a1f0da7..edcb865 100644
--- a/flink-staging/flink-gelly-scala/pom.xml
+++ b/flink-staging/flink-gelly-scala/pom.xml
@@ -48,7 +48,13 @@ under the License.
<artifactId>flink-gelly</artifactId>
<version>${project.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index ed58ffd..35af1ed 100644
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -100,7 +100,8 @@ object Graph {
env: ExecutionEnvironment): Graph[K, VV, EV] = {
val javaTupleVertices = vertices.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
- wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges, env.getJavaEnv))
+ wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges,
+ env.getJavaEnv))
}
/**
@@ -678,7 +679,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
/**
* Adds the given list edges to the graph.
*
- * When adding an edge for a non-existing set of vertices, the edge is considered invalid and ignored.
+ * When adding an edge for a non-existing set of vertices,
+ * the edge is considered invalid and ignored.
*
* @param newEdges the data set of edges to be added
* @return a new graph containing the existing edges plus the newly added edges.
@@ -757,7 +759,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
/**
* Performs Difference on the vertex and edge sets of the input graphs
- * removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed
+ * removes common vertices and edges. If a source/target vertex is removed,
+ * its corresponding edge will also be removed
* @param graph the graph to perform difference with
* @return a new graph where the common vertices and edges have been removed
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
new file mode 100644
index 0000000..c63c4f8
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala
+
+import java.lang.reflect.Method
+import org.apache.flink.graph.scala._
+import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
+import org.apache.flink.graph.{Graph => JavaGraph}
+import scala.language.existentials
+import org.junit.Test
+
+/**
+ * This checks whether the Gelly Scala API is up to feature parity with the Java API.
+ * Implements the {@link ScalaAPICompletenessTest} for Gelly.
+ */
+class GellyScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
+
+ override def isExcludedByName(method: Method): Boolean = {
+ val name = method.getDeclaringClass.getName + "." + method.getName
+ val excludedNames = Seq("org.apache.flink.graph.Graph.getContext",
+ // NOTE: until fromCsvReader() is added to to the Scala API Graph
+ "org.apache.flink.graph.Graph.fromCsvReader")
+ excludedNames.contains(name)
+ }
+
+ @Test
+ override def testCompleteness(): Unit = {
+ checkMethods("Graph", "Graph", classOf[JavaGraph[_, _, _]], classOf[Graph[_, _, _]])
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
index 98dbbe9..6196f99 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
@@ -26,42 +26,23 @@ import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class DegreesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
private var expectedResult: String = null
- var tempFolder: TemporaryFolder = new TemporaryFolder()
-
- @Rule
- def getFolder(): TemporaryFolder = {
- tempFolder;
- }
-
- @Before
- @throws(classOf[Exception])
- def before {
- resultPath = tempFolder.newFile.toURI.toString
- }
-
- @After
- @throws(classOf[Exception])
- def after {
- TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
- }
-
@Test
@throws(classOf[Exception])
def testInDegrees {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.inDegrees().writeAsCsv(resultPath)
- env.execute
- expectedResult = "1,1\n" + "2,1\n" + "3,2\n" + "4,1\n" + "5,2\n"
+ val res = graph.inDegrees().collect.toList
+ expectedResult = "(1,1)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,2)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@Test
@@ -70,9 +51,9 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.outDegrees().writeAsCsv(resultPath)
- env.execute
- expectedResult = "1,2\n" + "2,1\n" + "3,2\n" + "4,1\n" + "5,1\n"
+ val res = graph.outDegrees().collect.toList
+ expectedResult = "(1,2)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,1)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@Test
@@ -81,8 +62,8 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.getDegrees().writeAsCsv(resultPath)
- env.execute
- expectedResult = "1,3\n" + "2,2\n" + "3,4\n" + "4,2\n" + "5,3\n"
+ val res = graph.getDegrees().collect.toList
+ expectedResult = "(1,3)\n" + "(2,2)\n" + "(3,4)\n" + "(4,2)\n" + "(5,3)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
index 687b0a7..3cb92c4 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
@@ -27,33 +27,14 @@ import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class GraphMutationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
private var expectedResult: String = null
- var tempFolder: TemporaryFolder = new TemporaryFolder()
-
- @Rule
- def getFolder(): TemporaryFolder = {
- tempFolder;
- }
-
- @Before
- @throws(classOf[Exception])
- def before {
- resultPath = tempFolder.newFile.toURI.toString
- }
-
- @After
- @throws(classOf[Exception])
- def after {
- TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
- }
-
@Test
@throws(classOf[Exception])
def testAddVertex {
@@ -62,9 +43,9 @@ MultipleProgramsTestBase(mode) {
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L))
- newgraph.getVerticesAsTuple2.writeAsCsv(resultPath)
- env.execute
+ val res = newgraph.getVertices.collect().toList
expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -74,9 +55,9 @@ MultipleProgramsTestBase(mode) {
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.addVertex(new Vertex[Long, Long](1L, 1L))
- newgraph.getVerticesAsTuple2.writeAsCsv(resultPath)
- env.execute
+ val res = newgraph.getVertices.collect().toList
expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -86,9 +67,37 @@ MultipleProgramsTestBase(mode) {
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L))
- newgraph.getVerticesAsTuple2.writeAsCsv(resultPath)
- env.execute
+ val res = newgraph.getVertices.collect().toList
+ expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testAddVertices {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+
+ val newgraph = graph.addVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](6L, 6L),
+ new Vertex[Long, Long](7L, 7L)))
+ val res = newgraph.getVertices.collect().toList
+ expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n" + "7,7\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testAddVerticesExisting {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+
+ val newgraph = graph.addVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](5L, 5L),
+ new Vertex[Long, Long](6L, 6L)))
+ val res = newgraph.getVertices.collect().toList
expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -98,9 +107,9 @@ MultipleProgramsTestBase(mode) {
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.removeVertex(new Vertex[Long, Long](5L, 5L))
- newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
- env.execute
+ val res = newgraph.getEdges.collect().toList
expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -110,10 +119,36 @@ MultipleProgramsTestBase(mode) {
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.removeVertex(new Vertex[Long, Long](6L, 6L))
- newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
- env.execute
+ val res = newgraph.getEdges.collect.toList
expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
"45\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testRemoveVertices {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L),
+ new Vertex[Long, Long](2L, 2L)))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testRemoveValidAndInvalidVertex {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L),
+ new Vertex[Long, Long](6L, 6L)))
+ val res = newgraph.getEdges.collect.toList
+ expectedResult = "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -124,10 +159,38 @@ MultipleProgramsTestBase(mode) {
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.addEdge(new Vertex[Long, Long](6L, 6L), new Vertex[Long, Long](1L,
1L), 61L)
- newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
- env.execute
+ val res = newgraph.getEdges.collect.toList
expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
"45\n" + "5,1,51\n" + "6,1,61\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testAddEdges {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.addEdges(List[Edge[Long, Long]](new Edge(2L, 4L, 24L),
+ new Edge(4L, 1L, 41L), new Edge(4L, 3L, 43L)))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "2,4,24\n" + "3,4,34\n" + "3,5," +
+ "35\n" + "4,1,41\n" + "4,3,43\n" + "4,5,45\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testAddEdgesInvalidVertices {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.addEdges(List[Edge[Long, Long]](new Edge(6L, 1L, 61L),
+ new Edge(7L, 8L, 78L)))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," +
+ "35\n" + "4,5,45\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -138,10 +201,10 @@ MultipleProgramsTestBase(mode) {
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.addEdge(new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](2L,
2L), 12L)
- newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
- env.execute
+ val res = newgraph.getEdges.collect.toList
expectedResult = "1,2,12\n" + "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," +
"35\n" + "4,5,45\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -151,9 +214,9 @@ MultipleProgramsTestBase(mode) {
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.removeEdge(new Edge[Long, Long](5L, 1L, 51L))
- newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
- env.execute
+ val res = newgraph.getEdges.collect.toList
expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -163,9 +226,35 @@ MultipleProgramsTestBase(mode) {
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.removeEdge(new Edge[Long, Long](6L, 1L, 61L))
- newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
- env.execute
+ val res = newgraph.getEdges.collect.toList
expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
"45\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testRemoveEdges {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.removeEdges(List[Edge[Long, Long]](new Edge(1L, 2L, 12L),
+ new Edge(4L, 5L, 45L)))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testRemoveSameEdgeTwiceEdges {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.removeEdges(List[Edge[Long, Long]](new Edge(1L, 2L, 12L),
+ new Edge(1L, 2L, 12L)))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
index 713eb8d..7f7ebc0 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
@@ -27,44 +27,26 @@ import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class GraphOperationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
private var expectedResult: String = null
-
- var tempFolder: TemporaryFolder = new TemporaryFolder()
-
- @Rule
- def getFolder(): TemporaryFolder = {
- tempFolder;
- }
-
- @Before
- @throws(classOf[Exception])
- def before {
- resultPath = tempFolder.newFile.toURI.toString
- }
-
- @After
- @throws(classOf[Exception])
- def after {
- TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
- }
-
+
@Test
@throws(classOf[Exception])
def testUndirected {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.getUndirected().getEdgesAsTuple3().writeAsCsv(resultPath)
- env.execute
+ val res = graph.getUndirected.getEdges.collect().toList;
+
expectedResult = "1,2,12\n" + "2,1,12\n" + "1,3,13\n" + "3,1,13\n" + "2,3,23\n" + "3,2," +
"23\n" + "3,4,34\n" + "4,3,34\n" + "3,5,35\n" + "5,3,35\n" + "4,5,45\n" + "5,4,45\n" +
"5,1,51\n" + "1,5,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -73,10 +55,11 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.reverse().getEdgesAsTuple3().writeAsCsv(resultPath)
- env.execute
+ val res = graph.reverse().getEdges.collect().toList;
+
expectedResult = "2,1,12\n" + "3,1,13\n" + "3,2,23\n" + "4,3,34\n" + "5,3,35\n" + "5,4," +
"45\n" + "1,5,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -85,7 +68,7 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.subgraph(new FilterFunction[Vertex[Long, Long]] {
+ val res = graph.subgraph(new FilterFunction[Vertex[Long, Long]] {
@throws(classOf[Exception])
def filter(vertex: Vertex[Long, Long]): Boolean = {
return (vertex.getValue > 2)
@@ -96,9 +79,10 @@ MultipleProgramsTestBase(mode) {
override def filter(edge: Edge[Long, Long]): Boolean = {
return (edge.getValue > 34)
}
- }).getEdgesAsTuple3().writeAsCsv(resultPath)
- env.execute
+ }).getEdges.collect().toList;
+
expectedResult = "3,5,35\n" + "4,5,45\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -107,12 +91,13 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.subgraph(
+ val res = graph.subgraph(
vertex => vertex.getValue > 2,
edge => edge.getValue > 34
- ).getEdgesAsTuple3().writeAsCsv(resultPath)
- env.execute
+ ).getEdges.collect().toList;
+
expectedResult = "3,5,35\n" + "4,5,45\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -121,14 +106,15 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.filterOnVertices(new FilterFunction[Vertex[Long, Long]] {
+ val res = graph.filterOnVertices(new FilterFunction[Vertex[Long, Long]] {
@throws(classOf[Exception])
def filter(vertex: Vertex[Long, Long]): Boolean = {
vertex.getValue > 2
}
- }).getEdgesAsTuple3().writeAsCsv(resultPath)
- env.execute
+ }).getEdges.collect().toList;
+
expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -137,11 +123,12 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.filterOnVertices(
+ val res = graph.filterOnVertices(
vertex => vertex.getValue > 2
- ).getEdgesAsTuple3().writeAsCsv(resultPath)
- env.execute
+ ).getEdges.collect().toList;
+
expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -150,14 +137,15 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.filterOnEdges(new FilterFunction[Edge[Long, Long]] {
+ val res = graph.filterOnEdges(new FilterFunction[Edge[Long, Long]] {
@throws(classOf[Exception])
def filter(edge: Edge[Long, Long]): Boolean = {
edge.getValue > 34
}
- }).getEdgesAsTuple3().writeAsCsv(resultPath)
- env.execute
+ }).getEdges.collect().toList;
+
expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -166,11 +154,12 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.filterOnEdges(
+ val res = graph.filterOnEdges(
edge => edge.getValue > 34
- ).getEdgesAsTuple3().writeAsCsv(resultPath)
- env.execute
+ ).getEdges.collect().toList;
+
expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -179,9 +168,9 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- env.fromElements(graph.numberOfVertices).writeAsText(resultPath)
- env.execute
+ val res = env.fromElements(graph.numberOfVertices).collect().toList
expectedResult = "5"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@Test
@@ -190,9 +179,9 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- env.fromElements(graph.numberOfEdges).writeAsText(resultPath)
- env.execute
+ val res = env.fromElements(graph.numberOfEdges).collect().toList
expectedResult = "7"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@Test
@@ -201,9 +190,9 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.getVertexIds.writeAsText(resultPath)
- env.execute
+ val res = graph.getVertexIds.collect().toList
expectedResult = "1\n2\n3\n4\n5\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@Test
@@ -212,9 +201,10 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.getEdgeIds.writeAsCsv(resultPath)
- env.execute
- expectedResult = "1,2\n" + "1,3\n" + "2,3\n" + "3,4\n" + "3,5\n" + "4,5\n" + "5,1\n"
+ val res = graph.getEdgeIds.collect().toList
+ expectedResult = "(1,2)\n" + "(1,3)\n" + "(2,3)\n" + "(3,4)\n" + "(3,5)\n" + "(4,5)\n" +
+ "(5,1)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@Test
@@ -231,9 +221,62 @@ MultipleProgramsTestBase(mode) {
)
val newgraph = graph.union(Graph.fromCollection(vertices, edges, env))
- newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
- env.execute
+ val res = newgraph.getEdges.collect().toList
expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
"45\n" + "5,1,51\n" + "6,1,61\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testDifference {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]](
+ new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](3L, 3L),
+ new Vertex[Long, Long](6L, 6L)
+ )
+ val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]](
+ new Edge[Long, Long](1L, 3L, 13L), new Edge[Long, Long](1L, 6L, 16L),
+ new Edge[Long, Long](6L, 3L, 63L)
+ )
+
+ val newgraph = graph.difference(Graph.fromCollection(vertices, edges, env))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "4,5,45\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testDifferenceNoCommonVertices {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]](
+ new Vertex[Long, Long](6L, 6L)
+ )
+ val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]](
+ new Edge[Long, Long](6L, 6L, 66L)
+ )
+
+ val newgraph = graph.difference(Graph.fromCollection(vertices, edges, env))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
+ "45\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testTriplets {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.getTriplets.collect().toList
+ expectedResult = "1,2,1,2,12\n" + "1,3,1,3,13\n" + "2,3,2,3,23\n" + "3,4,3,4,34\n" +
+ "3,5,3,5,35\n" + "4,5,4,5,45\n" + "5,1,5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
index e19463e..eae8bd5 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
@@ -29,33 +29,14 @@ import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class JoinWithEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
private var expectedResult: String = null
- var tempFolder: TemporaryFolder = new TemporaryFolder()
-
- @Rule
- def getFolder(): TemporaryFolder = {
- tempFolder;
- }
-
- @Before
- @throws(classOf[Exception])
- def before {
- resultPath = tempFolder.newFile.toURI.toString
- }
-
- @After
- @throws(classOf[Exception])
- def after {
- TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
- }
-
@Test
@throws(classOf[Exception])
def testWithEdgesInputDataset {
@@ -64,10 +45,10 @@ MultipleProgramsTestBase(mode) {
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new
EdgeToTuple3Map[Long, Long]), new AddValuesMapper)
- result.getEdgesAsTuple3().writeAsCsv(resultPath)
- env.execute
+ val res = result.getEdges.collect.toList
expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
"90\n" + "5,1,102\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -79,10 +60,10 @@ MultipleProgramsTestBase(mode) {
val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new
EdgeToTuple3Map[Long, Long]), (originalValue: Long, tupleValue: Long) =>
originalValue + tupleValue)
- result.getEdgesAsTuple3().writeAsCsv(resultPath)
- env.execute
+ val res = result.getEdges.collect.toList
expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
"90\n" + "5,1,102\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -94,10 +75,10 @@ MultipleProgramsTestBase(mode) {
val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges
.map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) =>
originalValue + tupleValue)
- result.getEdgesAsTuple3().writeAsCsv(resultPath)
- env.execute
+ val res = result.getEdges.collect.toList
expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," +
"90\n" + "5,1,102\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -109,10 +90,10 @@ MultipleProgramsTestBase(mode) {
val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges
.map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) =>
originalValue + tupleValue)
- result.getEdgesAsTuple3().writeAsCsv(resultPath)
- env.execute
+ val res = result.getEdges.collect.toList
expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," +
"90\n" + "5,1,102\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -124,10 +105,10 @@ MultipleProgramsTestBase(mode) {
val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges
.map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) =>
originalValue + tupleValue)
- result.getEdgesAsTuple3().writeAsCsv(resultPath)
- env.execute
+ val res = result.getEdges.collect.toList
expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
"80\n" + "5,1,102\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -139,10 +120,10 @@ MultipleProgramsTestBase(mode) {
val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges
.map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) =>
originalValue + tupleValue)
- result.getEdgesAsTuple3().writeAsCsv(resultPath)
- env.execute
+ val res = result.getEdges.collect.toList
expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
"80\n" + "5,1,102\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
index 4b8f354..8d18d58 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
@@ -28,33 +28,14 @@ import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class JoinWithVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
private var expectedResult: String = null
- var tempFolder: TemporaryFolder = new TemporaryFolder()
-
- @Rule
- def getFolder(): TemporaryFolder = {
- tempFolder;
- }
-
- @Before
- @throws(classOf[Exception])
- def before {
- resultPath = tempFolder.newFile.toURI.toString
- }
-
- @After
- @throws(classOf[Exception])
- def after {
- TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
- }
-
@Test
@throws(classOf[Exception])
def testJoinWithVertexSet {
@@ -63,9 +44,9 @@ MultipleProgramsTestBase(mode) {
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result: Graph[Long, Long, Long] = graph.joinWithVertices(graph.getVertices.map(new
VertexToTuple2Map[Long, Long]), new AddValuesMapper)
- result.getVerticesAsTuple2().writeAsCsv(resultPath)
- env.execute
+ val res = result.getVertices.collect.toList
expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -77,9 +58,9 @@ MultipleProgramsTestBase(mode) {
val tupleSet = graph.getVertices.map(new VertexToTuple2Map[Long, Long])
val result: Graph[Long, Long, Long] = graph.joinWithVertices[Long](tupleSet,
(originalvalue: Long, tuplevalue: Long) => originalvalue + tuplevalue)
- result.getVerticesAsTuple2().writeAsCsv(resultPath)
- env.execute
+ val res = result.getVertices.collect.toList
expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
index 7e5ad14..0fa8d2b 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
@@ -18,7 +18,6 @@
package org.apache.flink.graph.scala.test.operations
-
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.scala._
import org.apache.flink.graph.Edge
@@ -29,42 +28,21 @@ import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class MapEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
private var expectedResult: String = null
- var tempFolder: TemporaryFolder = new TemporaryFolder()
-
- @Rule
- def getFolder(): TemporaryFolder = {
- tempFolder;
- }
-
- @Before
- @throws(classOf[Exception])
- def before {
- resultPath = tempFolder.newFile.toURI.toString
- }
-
- @After
- @throws(classOf[Exception])
- def after {
- TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
- }
-
@Test
@throws(classOf[Exception])
def testWithSameValue {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.mapEdges(new AddOneMapper)
- .getEdgesAsTuple3().writeAsCsv(resultPath)
- env.execute
+ val res = graph.mapEdges(new AddOneMapper).getEdges.collect.toList
expectedResult = "1,2,13\n" +
"1,3,14\n" + "" +
"2,3,24\n" +
@@ -72,6 +50,7 @@ MultipleProgramsTestBase(mode) {
"3,5,36\n" +
"4,5,46\n" +
"5,1,52\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -80,9 +59,8 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.mapEdges(edge => edge.getValue + 1)
- .getEdgesAsTuple3().writeAsCsv(resultPath)
- env.execute
+ val res = graph.mapEdges(edge => edge.getValue + 1)
+ .getEdges.collect.toList
expectedResult = "1,2,13\n" +
"1,3,14\n" + "" +
"2,3,24\n" +
@@ -90,6 +68,7 @@ MultipleProgramsTestBase(mode) {
"3,5,36\n" +
"4,5,46\n" +
"5,1,52\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
final class AddOneMapper extends MapFunction[Edge[Long, Long], Long] {
http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
index a22cfbd..c1ab3ea 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
@@ -28,48 +28,27 @@ import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class MapVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
private var expectedResult: String = null
- var tempFolder: TemporaryFolder = new TemporaryFolder()
-
- @Rule
- def getFolder(): TemporaryFolder = {
- tempFolder;
- }
-
- @Before
- @throws(classOf[Exception])
- def before {
- resultPath = tempFolder.newFile.toURI.toString
- }
-
- @After
- @throws(classOf[Exception])
- def after {
- TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
- }
-
@Test
@throws(classOf[Exception])
def testWithSameValue {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.mapVertices(new AddOneMapper)
- .getVerticesAsTuple2().writeAsCsv(resultPath)
- env.execute
-
+ val res = graph.mapVertices(new AddOneMapper).getVertices.collect.toList
expectedResult = "1,2\n" +
"2,3\n" +
"3,4\n" +
"4,5\n" +
"5,6\n";
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
@Test
@@ -78,15 +57,13 @@ MultipleProgramsTestBase(mode) {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.mapVertices(vertex => vertex.getValue + 1)
- .getVerticesAsTuple2().writeAsCsv(resultPath)
- env.execute
-
+ val res = graph.mapVertices(vertex => vertex.getValue + 1).getVertices.collect.toList
expectedResult = "1,2\n" +
"2,3\n" +
"3,4\n" +
"4,5\n" +
"5,6\n";
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
final class AddOneMapper extends MapFunction[Vertex[Long, Long], Long] {
http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
index 6ed383a..695f74a 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
@@ -28,46 +28,24 @@ import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode)
extends MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
private var expectedResult: String = null
- var tempFolder: TemporaryFolder = new TemporaryFolder()
-
- @Rule
- def getFolder(): TemporaryFolder = {
- tempFolder;
- }
-
- @Before
- @throws(classOf[Exception])
- def before {
- resultPath = tempFolder.newFile.toURI.toString
- }
-
- @After
- @throws(classOf[Exception])
- def after {
- TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
- }
-
@Test
@throws(classOf[Exception])
def testAllNeighborsWithValueGreaterThanFour {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val result = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour,
- EdgeDirection.ALL)
- result.writeAsCsv(resultPath)
- env.execute
-
-
- expectedResult = "5,1\n" + "5,3\n" + "5,4"
+ val res = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour,
+ EdgeDirection.ALL).collect.toList
+ expectedResult = "(5,1)\n" + "(5,3)\n" + "(5,4)"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@@ -77,13 +55,12 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- val result = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL)
- result.writeAsCsv(resultPath)
- env.execute
-
-
- expectedResult = "1,2\n" + "1,3\n" + "1,5\n" + "2,1\n" + "2,3\n" + "3,1\n" + "3,2\n" +
- "3,4\n" + "3,5\n" + "4,3\n" + "4,5\n" + "5,1\n" + "5,3\n" + "5,4"
+ val res = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL)
+ .collect.toList
+ expectedResult = "(1,2)\n" + "(1,3)\n" + "(1,5)\n" + "(2,1)\n" + "(2,3)\n" +
+ "(3,1)\n" + "(3,2)\n" + "(3,4)\n" + "(3,5)\n" + "(4,3)\n" + "(4,5)\n" +
+ "(5,1)\n" + "(5,3)\n" + "(5,4)"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@Test
@@ -94,9 +71,9 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new
SelectMinWeightNeighborNoValue, EdgeDirection.OUT)
- verticesWithLowestOutNeighbor.writeAsCsv(resultPath)
- env.execute
- expectedResult = "1,12\n" + "2,23\n" + "3,34\n" + "4,45\n" + "5,51\n"
+ val res = verticesWithLowestOutNeighbor.collect.toList
+ expectedResult = "(1,12)\n" + "(2,23)\n" + "(3,34)\n" + "(4,45)\n" + "(5,51)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@Test
@@ -107,9 +84,9 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new
SelectMinWeightNeighborNoValue, EdgeDirection.IN)
- verticesWithLowestOutNeighbor.writeAsCsv(resultPath)
- env.execute
- expectedResult = "1,51\n" + "2,12\n" + "3,13\n" + "4,34\n" + "5,35\n"
+ val res = verticesWithLowestOutNeighbor.collect.toList
+ expectedResult = "(1,51)\n" + "(2,12)\n" + "(3,13)\n" + "(4,34)\n" + "(5,35)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@Test
@@ -120,9 +97,9 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val verticesWithMaxEdgeWeight: DataSet[(Long, Long)] = graph.reduceOnEdges(new
SelectMaxWeightNeighborNoValue, EdgeDirection.ALL)
- verticesWithMaxEdgeWeight.writeAsCsv(resultPath)
- env.execute
- expectedResult = "1,51\n" + "2,23\n" + "3,35\n" + "4,45\n" + "5,51\n"
+ val res = verticesWithMaxEdgeWeight.collect.toList
+ expectedResult = "(1,51)\n" + "(2,23)\n" + "(3,35)\n" + "(4,45)\n" + "(5,51)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
final class SelectNeighborsValueGreaterThanFour extends EdgesFunctionWithVertexValue[Long,
http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
index 52e6d7a..b01e750 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
@@ -28,42 +28,24 @@ import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode)
extends MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
private var expectedResult: String = null
- var tempFolder: TemporaryFolder = new TemporaryFolder()
-
- @Rule
- def getFolder(): TemporaryFolder = {
- tempFolder;
- }
-
- @Before
- @throws(classOf[Exception])
- def before {
- resultPath = tempFolder.newFile.toURI.toString
- }
-
- @After
- @throws(classOf[Exception])
- def after {
- TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
- }
-
@Test
@throws(classOf[Exception])
def testSumOfAllNeighborsNoValue {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL).writeAsCsv(resultPath)
- env.execute
- expectedResult = "1,10\n" + "2,4\n" + "3,12\n" + "4,8\n" + "5,8\n"
+ val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL)
+ .collect.toList
+ expectedResult = "(1,10)\n" + "(2,4)\n" + "(3,12)\n" + "(4,8)\n" + "(5,8)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@Test
@@ -72,9 +54,9 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
- graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).writeAsCsv(resultPath)
- env.execute
- expectedResult = "1,5\n" + "2,3\n" + "3,9\n" + "4,5\n" + "5,1\n"
+ val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).collect.toList
+ expectedResult = "(1,5)\n" + "(2,3)\n" + "(3,9)\n" + "(4,5)\n" + "(5,1)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@Test
@@ -84,9 +66,9 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result = graph.groupReduceOnNeighbors(new SumAllNeighbors, EdgeDirection.ALL)
- result.writeAsCsv(resultPath)
- env.execute
- expectedResult = "1,11\n" + "2,6\n" + "3,15\n" + "4,12\n" + "5,13\n"
+ val res = result.collect.toList
+ expectedResult = "(1,11)\n" + "(2,6)\n" + "(3,15)\n" + "(4,12)\n" + "(5,13)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@Test
@@ -97,9 +79,9 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result = graph.groupReduceOnNeighbors(new
SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo, EdgeDirection.IN)
- result.writeAsCsv(resultPath)
- env.execute
- expectedResult = "3,59\n" + "3,118\n" + "4,204\n" + "4,102\n" + "5,570\n" + "5,285"
+ val res = result.collect.toList
+ expectedResult = "(3,59)\n" + "(3,118)\n" + "(4,204)\n" + "(4,102)\n" + "(5,570)\n" + "(5,285)"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
final class SumNeighbors extends ReduceNeighborsFunction[Long] {
[3/3] flink git commit: [FLINK-2561] [gelly] add missing methods to
Graph: add-remove edges/vertices, difference, graph creation methods, validate,
getTriplets. Add missing utility mappers.
Posted by va...@apache.org.
[FLINK-2561] [gelly] add missing methods to Graph:
add-remove edges/vertices, difference, graph creation methods,
validate, getTriplets. Add missing utility mappers.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b4dc067
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b4dc067
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b4dc067
Branch: refs/heads/master
Commit: 0b4dc067fee82654fae3292bf6bf7d59157bf5c0
Parents: 0f17755
Author: vasia <va...@apache.org>
Authored: Thu Sep 24 22:08:10 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Tue Sep 29 00:38:20 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/graph/scala/Graph.scala | 151 ++++++++++++++++++-
.../graph/scala/utils/Tuple2ToVertexMap.scala | 31 ++++
.../graph/scala/utils/Tuple3ToEdgeMap.scala | 31 ++++
3 files changed, 212 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0b4dc067/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 73e175e..ed58ffd 100644
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -23,26 +23,108 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.{tuple => jtuple}
import org.apache.flink.api.scala._
import org.apache.flink.graph._
+import org.apache.flink.graph.validation.GraphValidator
import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction}
import org.apache.flink.graph.spargel.{MessagingFunction, VertexCentricConfiguration, VertexUpdateFunction}
import org.apache.flink.{graph => jg}
-
import _root_.scala.collection.JavaConverters._
import _root_.scala.reflect.ClassTag
+import org.apache.flink.types.NullValue
object Graph {
+
+ /**
+ * Creates a Graph from a DataSet of vertices and a DataSet of edges.
+ */
def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
TypeInformation : ClassTag](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]],
env: ExecutionEnvironment): Graph[K, VV, EV] = {
wrapGraph(jg.Graph.fromDataSet[K, VV, EV](vertices.javaSet, edges.javaSet, env.getJavaEnv))
}
+ /**
+ * Creates a Graph from a DataSet of edges.
+ * Vertices are created automatically and their values are set to NullValue.
+ */
+ def fromDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
+ (edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
+ wrapGraph(jg.Graph.fromDataSet[K, EV](edges.javaSet, env.getJavaEnv))
+ }
+
+ /**
+ * Creates a graph from a DataSet of edges.
+ * Vertices are created automatically and their values are set by applying the provided
+ * map function to the vertex ids.
+ */
+ def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+ TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment,
+ mapper: MapFunction[K, VV]): Graph[K, VV, EV] = {
+ wrapGraph(jg.Graph.fromDataSet[K, VV, EV](edges.javaSet, mapper, env.getJavaEnv))
+ }
+
+ /**
+ * Creates a Graph from a Seq of vertices and a Seq of edges.
+ */
def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
TypeInformation : ClassTag](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]], env:
ExecutionEnvironment): Graph[K, VV, EV] = {
wrapGraph(jg.Graph.fromCollection[K, VV, EV](vertices.asJavaCollection, edges
.asJavaCollection, env.getJavaEnv))
}
+
+ /**
+ * Creates a Graph from a Seq of edges.
+ * Vertices are created automatically and their values are set to NullValue.
+ */
+ def fromCollection[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
+ (edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
+ wrapGraph(jg.Graph.fromCollection[K, EV](edges.asJavaCollection, env.getJavaEnv))
+ }
+
+ /**
+ * Creates a graph from a Seq of edges.
+ * Vertices are created automatically and their values are set by applying the provided
+ * map function to the vertex ids.
+ */
+ def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+ TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], env: ExecutionEnvironment,
+ mapper: MapFunction[K, VV]): Graph[K, VV, EV] = {
+ wrapGraph(jg.Graph.fromCollection[K, VV, EV](edges.asJavaCollection, mapper, env.getJavaEnv))
+ }
+
+ /**
+ * Creates a Graph from a DataSets of Tuples.
+ */
+ def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+ TypeInformation : ClassTag](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)],
+ env: ExecutionEnvironment): Graph[K, VV, EV] = {
+ val javaTupleVertices = vertices.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
+ val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
+ wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges, env.getJavaEnv))
+ }
+
+ /**
+ * Creates a Graph from a DataSet of Tuples representing the edges.
+ * Vertices are created automatically and their values are set to NullValue.
+ */
+ def fromTupleDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
+ (edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
+ val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
+ wrapGraph(jg.Graph.fromTupleDataSet[K, EV](javaTupleEdges, env.getJavaEnv))
+ }
+
+ /**
+ * Creates a Graph from a DataSet of Tuples representing the edges.
+ * Vertices are created automatically and their values are set by applying the provided
+ * map function to the vertex ids.
+ */
+ def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+ TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], env: ExecutionEnvironment,
+ mapper: MapFunction[K, VV]): Graph[K, VV, EV] = {
+ val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
+ wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, mapper, env.getJavaEnv))
+ }
+
}
/**
@@ -93,6 +175,14 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
}
/**
+ * @return a DataSet of Triplets,
+ * consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue)
+ */
+ def getTriplets(): DataSet[Triplet[K, VV, EV]] = {
+ wrap(jgraph.getTriplets())
+ }
+
+ /**
* Apply a function to the attribute of each vertex in the graph.
*
* @param mapper the map function to apply.
@@ -575,6 +665,29 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
}
/**
+ * Adds the list of vertices, passed as input, to the graph.
+ * If the vertices already exist in the graph, they will not be added once more.
+ *
+ * @param verticesToAdd the list of vertices to add
+ * @return the new graph containing the existing and newly added vertices
+ */
+ def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = {
+ wrapGraph(jgraph.addVertices(vertices.asJava))
+ }
+
+ /**
+ * Adds the given list edges to the graph.
+ *
+ * When adding an edge for a non-existing set of vertices, the edge is considered invalid and ignored.
+ *
+ * @param newEdges the data set of edges to be added
+ * @return a new graph containing the existing edges plus the newly added edges.
+ */
+ def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = {
+ wrapGraph(jgraph.addEdges(edges.asJava))
+ }
+
+ /**
* Adds the given edge to the graph. If the source and target vertices do
* not exist in the graph, they will also be added.
*
@@ -599,6 +712,17 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
wrapGraph(jgraph.removeVertex(vertex))
}
+ /**
+ * Removes the given vertex and its edges from the graph.
+ *
+ * @param vertex the vertex to remove
+ * @return the new graph containing the existing vertices and edges without
+ * the removed vertex and its edges
+ */
+ def removeVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = {
+ wrapGraph(jgraph.removeVertices(vertices.asJava))
+ }
+
/**
* Removes all edges that match the given edge from the graph.
*
@@ -611,6 +735,16 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
}
/**
+ * Removes all the edges that match the edges in the given data set from the graph.
+ *
+ * @param edgesToBeRemoved the list of edges to be removed
+ * @return a new graph where the edges have been removed and in which the vertices remained intact
+ */
+ def removeEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = {
+ wrapGraph(jgraph.removeEdges(edges.asJava))
+ }
+
+ /**
* Performs union on the vertices and edges sets of the input graphs
* removing duplicate vertices but maintaining duplicate edges.
*
@@ -622,6 +756,16 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
}
/**
+ * Performs Difference on the vertex and edge sets of the input graphs
+ * removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed
+ * @param graph the graph to perform difference with
+ * @return a new graph where the common vertices and edges have been removed
+ */
+ def difference(graph: Graph[K, VV, EV]) = {
+ wrapGraph(jgraph.difference(graph.getWrappedGraph))
+ }
+
+ /**
* Compute an aggregate over the neighbor values of each
* vertex.
*
@@ -732,4 +876,9 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction,
maxIterations, parameters))
}
+
+ def validate(validator: GraphValidator[K, VV, EV]): Boolean = {
+ jgraph.validate(validator)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b4dc067/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
new file mode 100644
index 0000000..f2b1133
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.Vertex
+
+class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]] {
+
+ private val serialVersionUID: Long = 1L
+
+ override def map(value: (K, VV)): Vertex[K, VV] = {
+ new Vertex(value._1, value._2)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b4dc067/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
new file mode 100644
index 0000000..00cb074
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.Edge
+
+class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]] {
+
+ private val serialVersionUID: Long = 1L
+
+ override def map(value: (K, K, EV)): Edge[K, EV] = {
+ new Edge(value._1, value._2, value._3)
+ }
+}