You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:06:01 UTC
[23/24] flink git commit: [FLINK-2833] [gelly] create a
flink-libraries module and move gelly there
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
new file mode 100644
index 0000000..2dc272c
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.example;
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.Edge
+import org.apache.flink.api.common.functions.MapFunction
+import scala.collection.JavaConversions._
+import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
+import org.apache.flink.graph.gsa.GatherFunction
+import org.apache.flink.graph.gsa.Neighbor
+import org.apache.flink.graph.gsa.SumFunction
+import org.apache.flink.graph.gsa.ApplyFunction
+
+/**
+ * This example shows how to use Gelly's gather-sum-apply iterations.
+ *
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm.
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
+ */
+object GSASingleSourceShortestPaths {
+ def main(args: Array[String]) {
+ if (!parseParameters(args)) {
+ return
+ }
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
+ val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
+
+ // Execute the gather-sum-apply iteration
+ val result = graph.runGatherSumApplyIteration(new CalculateDistances, new ChooseMinDistance,
+ new UpdateDistance, maxIterations)
+
+ // Extract the vertices as the result
+ val singleSourceShortestPaths = result.getVertices
+
+ // emit result
+ if (fileOutput) {
+ singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
+ env.execute("GSA Single Source Shortest Paths Example")
+ } else {
+ singleSourceShortestPaths.print()
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Single Source Shortest Path UDFs
+ // --------------------------------------------------------------------------------------------
+
+ private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
+
+ override def map(id: Long) = {
+ if (id.equals(srcId)) {
+ 0.0
+ } else {
+ Double.PositiveInfinity
+ }
+ }
+ }
+
+ private final class CalculateDistances extends GatherFunction[Double, Double, Double] {
+ override def gather(neighbor: Neighbor[Double, Double]) = {
+ neighbor.getNeighborValue + neighbor.getEdgeValue
+ }
+ }
+
+ private final class ChooseMinDistance extends SumFunction[Double, Double, Double] {
+ override def sum(newValue: Double, currentValue: Double) = {
+ Math.min(newValue, currentValue)
+ }
+ }
+
+ private final class UpdateDistance extends ApplyFunction[Long, Double, Double] {
+ override def apply(newDistance: Double, oldDistance: Double) = {
+ if (newDistance < oldDistance) {
+ setResult(newDistance)
+ }
+ }
+ }
+
+ // **************************************************************************
+ // UTIL METHODS
+ // **************************************************************************
+
+ private var fileOutput = false
+ private var srcVertexId = 1L
+ private var edgesInputPath: String = null
+ private var outputPath: String = null
+ private var maxIterations = 5
+
+ private def parseParameters(args: Array[String]): Boolean = {
+ if(args.length > 0) {
+ if(args.length != 4) {
+ System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+ " <input edges path> <output path> <num iterations>")
+ false
+ }
+ fileOutput = true
+ srcVertexId = args(0).toLong
+ edgesInputPath = args(1)
+ outputPath = args(2)
+ maxIterations = (3).toInt
+ } else {
+ System.out.println("Executing Single Source Shortest Paths example "
+ + "with default parameters and built-in default data.")
+ System.out.println(" Provide parameters to read input data from files.")
+ System.out.println(" See the documentation for the correct format of input files.")
+ System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+ " <input edges path> <output path> <num iterations>");
+ }
+ true
+ }
+
+ private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = {
+ if (fileOutput) {
+ env.readCsvFile[(Long, Long, Double)](edgesInputPath,
+ lineDelimiter = "\n",
+ fieldDelimiter = "\t")
+ .map(new Tuple3ToEdgeMap[Long, Double]())
+ } else {
+ val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
+ case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
+ z.asInstanceOf[Double])
+ }
+ env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
new file mode 100644
index 0000000..4eed824
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.graph.scala.example
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.Edge
+import org.apache.flink.util.Collector
+
+/**
+ * This example illustrates how to use Gelly metrics methods and get simple statistics
+ * from the input graph.
+ *
+ * The program creates a random graph and computes and prints
+ * the following metrics:
+ * - number of vertices
+ * - number of edges
+ * - average node degree
+ * - the vertex ids with the max/min in- and out-degrees
+ *
+ * The input file is expected to contain one edge per line,
+ * with long IDs and no values, in the following format:
+ * {{{
+ * <sourceVertexID>\t<targetVertexID>
+ * }}}
+ * If no arguments are provided, the example runs with a random graph of 100 vertices.
+ *
+ */
+object GraphMetrics {
+ def main(args: Array[String]) {
+ if (!parseParameters(args)) {
+ return
+ }
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ /** create the graph **/
+ val graph: Graph[Long, NullValue, NullValue] = Graph.fromDataSet(getEdgeDataSet(env), env)
+
+ /** get the number of vertices **/
+ val numVertices = graph.numberOfVertices;
+
+ /** get the number of edges **/
+ val numEdges = graph.numberOfEdges;
+
+ /** compute the average node degree **/
+ val verticesWithDegrees = graph.getDegrees;
+ val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble)
+
+ /** find the vertex with the maximum in-degree **/
+ val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1)
+
+ /** find the vertex with the minimum in-degree **/
+ val minInDegreeVertex = graph.inDegrees.min(1).map(in => in._1)
+
+ /** find the vertex with the maximum out-degree **/
+ val maxOutDegreeVertex = graph.outDegrees.max(1).map(in => in._1)
+
+ /** find the vertex with the minimum out-degree **/
+ val minOutDegreeVertex = graph.outDegrees.min(1).map(in => in._1)
+
+ /** print the results **/
+ env.fromElements(numVertices).printOnTaskManager("Total number of vertices")
+ env.fromElements(numEdges).printOnTaskManager("Total number of edges")
+ avgDegree.printOnTaskManager("Average node degree")
+ maxInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
+ minInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
+ maxOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
+ minOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
+
+ }
+
+ private def parseParameters(args: Array[String]): Boolean = {
+ if (args.length > 0) {
+ fileOutput = true
+ if (args.length == 1) {
+ edgesPath = args(0)
+ true
+ } else {
+ System.err.println("Usage: GraphMetrics <edges path>")
+ false
+ }
+ } else {
+ System.out.println("Executing GraphMetrics example with built-in default data.")
+ System.out.println(" Provide parameters to read input data from a file.")
+ System.out.println(" Usage: GraphMetrics <edges path>")
+ true
+ }
+ }
+
+ private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
+ if (fileOutput) {
+ env.readCsvFile[(Long, Long)](
+ edgesPath,
+ fieldDelimiter = "\t").map(
+ in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance()))
+ } else {
+ env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]](
+ (key: Long, out: Collector[Edge[Long, NullValue]]) => {
+ val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
+ for ( i <- 0 to numOutEdges ) {
+ var target: Long = ((Math.random() * numVertices) + 1).toLong
+ new Edge[Long, NullValue](key, target, NullValue.getInstance())
+ }
+ })
+ }
+ }
+
+ private var fileOutput: Boolean = false
+ private var edgesPath: String = null
+ private var outputPath: String = null
+ private val numVertices = 100
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
new file mode 100644
index 0000000..65a8e7f
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.example;
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.Edge
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.spargel.VertexUpdateFunction
+import org.apache.flink.graph.spargel.MessageIterator
+import org.apache.flink.graph.Vertex
+import org.apache.flink.graph.spargel.MessagingFunction
+import scala.collection.JavaConversions._
+import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
+
+/**
+ * This example shows how to use Gelly's vertex-centric iterations.
+ *
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm.
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
+ */
+object SingleSourceShortestPaths {
+ def main(args: Array[String]) {
+ if (!parseParameters(args)) {
+ return
+ }
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
+ val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
+
+ // Execute the vertex-centric iteration
+ val result = graph.runVertexCentricIteration(new VertexDistanceUpdater,
+ new MinDistanceMessenger, maxIterations)
+
+ // Extract the vertices as the result
+ val singleSourceShortestPaths = result.getVertices
+
+ // emit result
+ if (fileOutput) {
+ singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
+ env.execute("Single Source Shortest Paths Example")
+ } else {
+ singleSourceShortestPaths.print()
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Single Source Shortest Path UDFs
+ // --------------------------------------------------------------------------------------------
+
+ private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
+
+ override def map(id: Long) = {
+ if (id.equals(srcId)) {
+ 0.0
+ } else {
+ Double.PositiveInfinity
+ }
+ }
+ }
+
+ /**
+ * Function that updates the value of a vertex by picking the minimum
+ * distance from all incoming messages.
+ */
+ private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Double] {
+
+ override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) {
+ var minDistance = Double.MaxValue
+ while (inMessages.hasNext) {
+ var msg = inMessages.next
+ if (msg < minDistance) {
+ minDistance = msg
+ }
+ }
+ if (vertex.getValue > minDistance) {
+ setNewVertexValue(minDistance)
+ }
+ }
+ }
+
+ /**
+ * Distributes the minimum distance associated with a given vertex among all
+ * the target vertices summed up with the edge's value.
+ */
+ private final class MinDistanceMessenger extends
+ MessagingFunction[Long, Double, Double, Double] {
+
+ override def sendMessages(vertex: Vertex[Long, Double]) {
+ for (edge: Edge[Long, Double] <- getEdges) {
+ sendMessageTo(edge.getTarget(), vertex.getValue + edge.getValue)
+ }
+ }
+ }
+
+ // ****************************************************************************
+ // UTIL METHODS
+ // ****************************************************************************
+
+ private var fileOutput = false
+ private var srcVertexId = 1L
+ private var edgesInputPath: String = null
+ private var outputPath: String = null
+ private var maxIterations = 5
+
+ private def parseParameters(args: Array[String]): Boolean = {
+ if(args.length > 0) {
+ if(args.length != 4) {
+ System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+ " <input edges path> <output path> <num iterations>")
+ false
+ }
+ fileOutput = true
+ srcVertexId = args(0).toLong
+ edgesInputPath = args(1)
+ outputPath = args(2)
+ maxIterations = (3).toInt
+ } else {
+ System.out.println("Executing Single Source Shortest Paths example "
+ + "with default parameters and built-in default data.")
+ System.out.println(" Provide parameters to read input data from files.")
+ System.out.println(" See the documentation for the correct format of input files.")
+ System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+ " <input edges path> <output path> <num iterations>");
+ }
+ true
+ }
+
+ private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = {
+ if (fileOutput) {
+ env.readCsvFile[(Long, Long, Double)](edgesInputPath,
+ lineDelimiter = "\n",
+ fieldDelimiter = "\t")
+ .map(new Tuple3ToEdgeMap[Long, Double]())
+ } else {
+ val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
+ case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
+ z.asInstanceOf[Double])
+ }
+ env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala
new file mode 100644
index 0000000..159a100
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.graph.{Graph => JGraph}
+
+import _root_.scala.reflect.ClassTag
+
+
+package object scala {
+ private[flink] def wrapGraph[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag,
+ EV: TypeInformation : ClassTag](javagraph: JGraph[K, VV, EV]) = new Graph[K, VV, EV](javagraph)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
new file mode 100644
index 0000000..909dbb4
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.Edge
+
+@SerialVersionUID(1L)
+class EdgeToTuple3Map[K, EV] extends MapFunction[Edge[K, EV], (K, K, EV)] {
+
+ override def map(value: Edge[K, EV]): (K, K, EV) = {
+ (value.getSource, value.getTarget, value.getValue)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
new file mode 100644
index 0000000..fd6b8c5
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.Vertex
+
+@SerialVersionUID(1L)
+class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]] {
+
+ override def map(value: (K, VV)): Vertex[K, VV] = {
+ new Vertex(value._1, value._2)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
new file mode 100644
index 0000000..d0e07cc
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.Edge
+
+@SerialVersionUID(1L)
+class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]] {
+
+ override def map(value: (K, K, EV)): Edge[K, EV] = {
+ new Edge(value._1, value._2, value._3)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
new file mode 100644
index 0000000..faf4e10
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.Vertex
+
+@SerialVersionUID(1L)
+class VertexToTuple2Map[K, VV] extends MapFunction[Vertex[K, VV], (K, VV)] {
+
+ override def map(value: Vertex[K, VV]): (K, VV) = {
+ (value.getId, value.getValue)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
new file mode 100644
index 0000000..55faee3
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala
+
+import java.lang.reflect.Method
+import org.apache.flink.graph.scala._
+import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
+import org.apache.flink.graph.{Graph => JavaGraph}
+import scala.language.existentials
+import org.junit.Test
+
+/**
+ * This checks whether the Gelly Scala API is up to feature parity with the Java API.
+ * Implements the {@link ScalaAPICompletenessTest} for Gelly.
+ */
+class GellyScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
+
+ override def isExcludedByName(method: Method): Boolean = {
+ val name = method.getDeclaringClass.getName + "." + method.getName
+ val excludedNames = Seq("org.apache.flink.graph.Graph.getContext")
+ excludedNames.contains(name)
+ }
+
+ @Test
+ override def testCompleteness(): Unit = {
+ checkMethods("Graph", "Graph", classOf[JavaGraph[_, _, _]], classOf[Graph[_, _, _]])
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
new file mode 100644
index 0000000..1c2cf54
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.{Edge, Vertex}
+
+object TestGraphUtils {
+
+ def getLongLongVertexData(env: ExecutionEnvironment): DataSet[Vertex[Long, Long]] = {
+ return env.fromCollection(getLongLongVertices)
+ }
+
+ def getLongLongEdgeData(env: ExecutionEnvironment): DataSet[Edge[Long, Long]] = {
+ return env.fromCollection(getLongLongEdges)
+ }
+
+ def getLongLongVertices: List[Vertex[Long, Long]] = {
+ List(
+ new Vertex[Long, Long](1L, 1L),
+ new Vertex[Long, Long](2L, 2L),
+ new Vertex[Long, Long](3L, 3L),
+ new Vertex[Long, Long](4L, 4L),
+ new Vertex[Long, Long](5L, 5L)
+ )
+ }
+
+ def getLongLongEdges: List[Edge[Long, Long]] = {
+ List(
+ new Edge[Long, Long](1L, 2L, 12L),
+ new Edge[Long, Long](1L, 3L, 13L),
+ new Edge[Long, Long](2L, 3L, 23L),
+ new Edge[Long, Long](3L, 4L, 34L),
+ new Edge[Long, Long](3L, 5L, 35L),
+ new Edge[Long, Long](4L, 5L, 45L),
+ new Edge[Long, Long](5L, 1L, 51L)
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
new file mode 100644
index 0000000..b347049
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class DegreesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+ private var expectedResult: String = null
+
+ @Test
+ @throws(classOf[Exception])
+ def testInDegrees {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.inDegrees.collect().toList
+ expectedResult = "(1,1)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,2)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testOutDegrees {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.outDegrees.collect().toList
+ expectedResult = "(1,2)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,1)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testGetDegrees {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.getDegrees.collect().toList
+ expectedResult = "(1,3)\n" + "(2,2)\n" + "(3,4)\n" + "(4,2)\n" + "(5,3)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
new file mode 100644
index 0000000..6ceaf16
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+import java.io.IOException
+import org.apache.flink.core.fs.FileInputSplit
+import java.io.File
+import java.io.OutputStreamWriter
+import java.io.FileOutputStream
+import java.io.FileOutputStream
+import com.google.common.base.Charsets
+import org.apache.flink.core.fs.Path
+import org.apache.flink.types.NullValue
+import org.apache.flink.api.common.functions.MapFunction
+
+@RunWith(classOf[Parameterized])
+class GraphCreationWithCsvITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+ private var expectedResult: String = null
+
+ @Test
+ @throws(classOf[Exception])
+ def testCsvWithValues {
+ /*
+ * Test with two Csv files, both vertices and edges have values
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val verticesContent = "1,1\n2,2\n3,3\n"
+ val verticesSplit = createTempFile(verticesContent)
+ val edgesContent = "1,2,ot\n3,2,tt\n3,1,to\n"
+ val edgesSplit = createTempFile(edgesContent)
+ val graph = Graph.fromCsvReader[Long, Long, String](
+ readVertices = true,
+ pathVertices = verticesSplit.getPath.toString,
+ pathEdges = edgesSplit.getPath.toString,
+ env = env)
+
+ val result = graph.getTriplets.collect()
+ expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
+ TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testCsvNoEdgeValues {
+ /*
+ * Test with two Csv files; edges have no values
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val verticesContent = "1,one\n2,two\n3,three\n"
+ val verticesSplit = createTempFile(verticesContent)
+ val edgesContent = "1,2\n3,2\n3,1\n"
+ val edgesSplit = createTempFile(edgesContent)
+ val graph = Graph.fromCsvReader[Long, String, NullValue](
+ readVertices = true,
+ pathVertices = verticesSplit.getPath.toString,
+ pathEdges = edgesSplit.getPath.toString,
+ hasEdgeValues = false,
+ env = env)
+
+ val result = graph.getTriplets.collect()
+ expectedResult = "1,2,one,two,(null)\n3,2,three,two,(null)\n3,1,three,one,(null)\n"
+ TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testCsvWithMapperValues {
+ /*
+ * Test with edges Csv file and vertex mapper initializer
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val edgesContent = "1,2,12\n3,2,32\n3,1,31\n"
+ val edgesSplit = createTempFile(edgesContent)
+ val graph = Graph.fromCsvReader[Long, Double, Long](
+ readVertices = false,
+ pathEdges = edgesSplit.getPath.toString,
+ mapper = new VertexDoubleIdAssigner(),
+ env = env)
+
+ val result = graph.getTriplets.collect()
+ expectedResult = "1,2,1.0,2.0,12\n3,2,3.0,2.0,32\n3,1,3.0,1.0,31\n"
+ TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testCsvNoVertexValues {
+ /*
+ * Test with edges Csv file: no vertex values
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val edgesContent = "1,2,12\n3,2,32\n3,1,31\n"
+ val edgesSplit = createTempFile(edgesContent)
+ val graph = Graph.fromCsvReader[Long, NullValue, Long](
+ readVertices = false,
+ pathEdges = edgesSplit.getPath.toString,
+ env = env)
+
+ val result = graph.getTriplets.collect()
+ expectedResult = "1,2,(null),(null),12\n3,2,(null),(null),32\n" +
+ "3,1,(null),(null),31\n"
+ TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testCsvNoValues {
+ /*
+ * Test with edges Csv file: neither vertex nor edge values
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val edgesContent = "1,2\n3,2\n3,1\n"
+ val edgesSplit = createTempFile(edgesContent)
+ val graph = Graph.fromCsvReader[Long, NullValue, NullValue](
+ readVertices = false,
+ pathEdges = edgesSplit.getPath.toString,
+ hasEdgeValues = false,
+ env = env)
+
+ val result = graph.getTriplets.collect()
+ expectedResult = "1,2,(null),(null),(null)\n" +
+ "3,2,(null),(null),(null)\n3,1,(null),(null),(null)\n"
+ TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testCsvOptionsVertices {
+ /*
+ * Test the options for vertices: delimiters, comments, ignore first line.
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val verticesContent = "42#42\t" + "%this-is-a-comment\t" +
+ "1#1\t" + "2#2\t" + "3#3\t"
+ val verticesSplit = createTempFile(verticesContent)
+ val edgesContent = "1,2,ot\n3,2,tt\n3,1,to\n"
+ val edgesSplit = createTempFile(edgesContent)
+ val graph = Graph.fromCsvReader[Long, Long, String](
+ readVertices = true,
+ pathVertices = verticesSplit.getPath.toString,
+ lineDelimiterVertices = "\t",
+ fieldDelimiterVertices = "#",
+ ignoreFirstLineVertices = true,
+ ignoreCommentsVertices = "%",
+ pathEdges = edgesSplit.getPath.toString,
+ env = env)
+
+ val result = graph.getTriplets.collect()
+ expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
+ TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testCsvOptionsEdges {
+ /*
+ * Test the options for edges: delimiters, comments, ignore first line.
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val verticesContent = "1,1\n2,2\n3,3\n"
+ val verticesSplit = createTempFile(verticesContent)
+ val edgesContent = "42#42#ignore&" + "1#2#ot&" + "3#2#tt&" + "3#1#to&" +
+ "//this-is-a-comment"
+ val edgesSplit = createTempFile(edgesContent)
+ val graph = Graph.fromCsvReader[Long, Long, String](
+ pathVertices = verticesSplit.getPath.toString,
+ readVertices = true,
+ lineDelimiterEdges = "&",
+ fieldDelimiterEdges = "#",
+ ignoreFirstLineEdges = true,
+ ignoreCommentsEdges = "//",
+ pathEdges = edgesSplit.getPath.toString,
+ env = env)
+
+ val result = graph.getTriplets.collect()
+ expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
+ TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+ }
+
+ @throws(classOf[IOException])
+ def createTempFile(content: String): FileInputSplit = {
+ val tempFile = File.createTempFile("test_contents", "tmp")
+ tempFile.deleteOnExit()
+
+ val wrt = new OutputStreamWriter(new FileOutputStream(tempFile), Charsets.UTF_8)
+ wrt.write(content)
+ wrt.close()
+
+ new FileInputSplit(0, new Path(tempFile.toURI.toString), 0, tempFile.length,
+ Array("localhost"));
+ }
+
+ final class VertexDoubleIdAssigner extends MapFunction[Long, Double] {
+ @throws(classOf[Exception])
+ def map(id: Long): Double = {id.toDouble}
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
new file mode 100644
index 0000000..4b776e2
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.{Edge, Vertex}
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class GraphMutationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+ private var expectedResult: String = null
+
+ @Test
+ @throws(classOf[Exception])
+ def testAddVertex {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+
+ val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L))
+ val res = newgraph.getVertices.collect().toList
+ expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testAddVertexExisting {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.addVertex(new Vertex[Long, Long](1L, 1L))
+ val res = newgraph.getVertices.collect().toList
+ expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testAddVertexNoEdges {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L))
+ val res = newgraph.getVertices.collect().toList
+ expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testAddVertices {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+
+ val newgraph = graph.addVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](6L, 6L),
+ new Vertex[Long, Long](7L, 7L)))
+ val res = newgraph.getVertices.collect().toList
+ expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n" + "7,7\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testAddVerticesExisting {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+
+ val newgraph = graph.addVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](5L, 5L),
+ new Vertex[Long, Long](6L, 6L)))
+ val res = newgraph.getVertices.collect().toList
+ expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testRemoveVertex {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.removeVertex(new Vertex[Long, Long](5L, 5L))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testRemoveInvalidVertex {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.removeVertex(new Vertex[Long, Long](6L, 6L))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
+ "45\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testRemoveVertices {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L),
+ new Vertex[Long, Long](2L, 2L)))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testRemoveValidAndInvalidVertex {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L),
+ new Vertex[Long, Long](6L, 6L)))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testAddEdge {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.addEdge(new Vertex[Long, Long](6L, 6L), new Vertex[Long, Long](1L,
+ 1L), 61L)
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
+ "45\n" + "5,1,51\n" + "6,1,61\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testAddEdges {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.addEdges(List[Edge[Long, Long]](new Edge(2L, 4L, 24L),
+ new Edge(4L, 1L, 41L), new Edge(4L, 3L, 43L)))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "2,4,24\n" + "3,4,34\n" + "3,5," +
+ "35\n" + "4,1,41\n" + "4,3,43\n" + "4,5,45\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testAddEdgesInvalidVertices {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.addEdges(List[Edge[Long, Long]](new Edge(6L, 1L, 61L),
+ new Edge(7L, 8L, 78L)))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," +
+ "35\n" + "4,5,45\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testAddExistingEdge {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.addEdge(new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](2L,
+ 2L), 12L)
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "1,2,12\n" + "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," +
+ "35\n" + "4,5,45\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testRemoveEdge {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.removeEdge(new Edge[Long, Long](5L, 1L, 51L))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testRemoveInvalidEdge {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.removeEdge(new Edge[Long, Long](6L, 1L, 61L))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
+ "45\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testRemoveEdges {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.removeEdges(List[Edge[Long, Long]](new Edge(1L, 2L, 12L),
+ new Edge(4L, 5L, 45L)))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testRemoveSameEdgeTwiceEdges {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val newgraph = graph.removeEdges(List[Edge[Long, Long]](new Edge(1L, 2L, 12L),
+ new Edge(1L, 2L, 12L)))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
new file mode 100644
index 0000000..7f7ebc0
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.{Edge, Vertex}
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class GraphOperationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+ private var expectedResult: String = null
+
+ @Test
+ @throws(classOf[Exception])
+ def testUndirected {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.getUndirected.getEdges.collect().toList;
+
+ expectedResult = "1,2,12\n" + "2,1,12\n" + "1,3,13\n" + "3,1,13\n" + "2,3,23\n" + "3,2," +
+ "23\n" + "3,4,34\n" + "4,3,34\n" + "3,5,35\n" + "5,3,35\n" + "4,5,45\n" + "5,4,45\n" +
+ "5,1,51\n" + "1,5,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testReverse {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.reverse().getEdges.collect().toList;
+
+ expectedResult = "2,1,12\n" + "3,1,13\n" + "3,2,23\n" + "4,3,34\n" + "5,3,35\n" + "5,4," +
+ "45\n" + "1,5,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testSubGraph {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.subgraph(new FilterFunction[Vertex[Long, Long]] {
+ @throws(classOf[Exception])
+ def filter(vertex: Vertex[Long, Long]): Boolean = {
+ return (vertex.getValue > 2)
+ }
+ }, new FilterFunction[Edge[Long, Long]] {
+
+ @throws(classOf[Exception])
+ override def filter(edge: Edge[Long, Long]): Boolean = {
+ return (edge.getValue > 34)
+ }
+ }).getEdges.collect().toList;
+
+ expectedResult = "3,5,35\n" + "4,5,45\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testSubGraphSugar {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.subgraph(
+ vertex => vertex.getValue > 2,
+ edge => edge.getValue > 34
+ ).getEdges.collect().toList;
+
+ expectedResult = "3,5,35\n" + "4,5,45\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testFilterOnVertices {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.filterOnVertices(new FilterFunction[Vertex[Long, Long]] {
+ @throws(classOf[Exception])
+ def filter(vertex: Vertex[Long, Long]): Boolean = {
+ vertex.getValue > 2
+ }
+ }).getEdges.collect().toList;
+
+ expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testFilterOnVerticesSugar {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.filterOnVertices(
+ vertex => vertex.getValue > 2
+ ).getEdges.collect().toList;
+
+ expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testFilterOnEdges {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.filterOnEdges(new FilterFunction[Edge[Long, Long]] {
+ @throws(classOf[Exception])
+ def filter(edge: Edge[Long, Long]): Boolean = {
+ edge.getValue > 34
+ }
+ }).getEdges.collect().toList;
+
+ expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testFilterOnEdgesSugar {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.filterOnEdges(
+ edge => edge.getValue > 34
+ ).getEdges.collect().toList;
+
+ expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testNumberOfVertices {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = env.fromElements(graph.numberOfVertices).collect().toList
+ expectedResult = "5"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testNumberOfEdges {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = env.fromElements(graph.numberOfEdges).collect().toList
+ expectedResult = "7"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testVertexIds {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.getVertexIds.collect().toList
+ expectedResult = "1\n2\n3\n4\n5\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testEdgesIds {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.getEdgeIds.collect().toList
+ expectedResult = "(1,2)\n" + "(1,3)\n" + "(2,3)\n" + "(3,4)\n" + "(3,5)\n" + "(4,5)\n" +
+ "(5,1)\n"
+ TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testUnion {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]](
+ new Vertex[Long, Long](6L, 6L)
+ )
+ val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]](
+ new Edge[Long, Long](6L, 1L, 61L)
+ )
+
+ val newgraph = graph.union(Graph.fromCollection(vertices, edges, env))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
+ "45\n" + "5,1,51\n" + "6,1,61\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testDifference {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]](
+ new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](3L, 3L),
+ new Vertex[Long, Long](6L, 6L)
+ )
+ val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]](
+ new Edge[Long, Long](1L, 3L, 13L), new Edge[Long, Long](1L, 6L, 16L),
+ new Edge[Long, Long](6L, 3L, 63L)
+ )
+
+ val newgraph = graph.difference(Graph.fromCollection(vertices, edges, env))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "4,5,45\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testDifferenceNoCommonVertices {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]](
+ new Vertex[Long, Long](6L, 6L)
+ )
+ val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]](
+ new Edge[Long, Long](6L, 6L, 66L)
+ )
+
+ val newgraph = graph.difference(Graph.fromCollection(vertices, edges, env))
+ val res = newgraph.getEdges.collect().toList
+ expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
+ "45\n" + "5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testTriplets {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.getTriplets.collect().toList
+ expectedResult = "1,2,1,2,12\n" + "1,3,1,3,13\n" + "2,3,2,3,23\n" + "3,4,3,4,34\n" +
+ "3,5,3,5,35\n" + "4,5,4,5,45\n" + "5,1,5,1,51\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
new file mode 100644
index 0000000..3dc90fc
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.Edge
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.scala.utils.EdgeToTuple3Map
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class JoinWithEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+ private var expectedResult: String = null
+
+ @Test
+ @throws(classOf[Exception])
+ def testWithEdgesInputDataset {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new
+ EdgeToTuple3Map[Long, Long]), new AddValuesMapper)
+ val res = result.getEdges.collect().toList
+ expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
+ "90\n" + "5,1,102\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testWithEdgesInputDatasetSugar {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new
+ EdgeToTuple3Map[Long, Long]), (originalValue: Long, tupleValue: Long) =>
+ originalValue + tupleValue)
+ val res = result.getEdges.collect().toList
+ expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
+ "90\n" + "5,1,102\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testWithEdgesOnSource {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges
+ .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) =>
+ originalValue + tupleValue)
+ val res = result.getEdges.collect().toList
+ expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," +
+ "90\n" + "5,1,102\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testWithEdgesOnSourceSugar {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges
+ .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) =>
+ originalValue + tupleValue)
+ val res = result.getEdges.collect().toList
+ expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," +
+ "90\n" + "5,1,102\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testWithEdgesOnTarget {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges
+ .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) =>
+ originalValue + tupleValue)
+ val res = result.getEdges.collect().toList
+ expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
+ "80\n" + "5,1,102\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testWithEdgesOnTargetSugar {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges
+ .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) =>
+ originalValue + tupleValue)
+ val res = result.getEdges.collect().toList
+ expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
+ "80\n" + "5,1,102\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+
+ final class AddValuesMapper extends MapFunction[(Long, Long), Long] {
+ @throws(classOf[Exception])
+ def map(tuple: (Long, Long)): Long = {
+ tuple._1 + tuple._2
+ }
+ }
+
+ final class ProjectSourceAndValueMapper extends MapFunction[Edge[Long, Long], (Long, Long)] {
+ @throws(classOf[Exception])
+ def map(edge: Edge[Long, Long]): (Long, Long) = {
+ (edge.getSource, edge.getValue)
+ }
+ }
+
+ final class ProjectTargetAndValueMapper extends MapFunction[Edge[Long, Long], (Long, Long)] {
+ @throws(classOf[Exception])
+ def map(edge: Edge[Long, Long]): (Long, Long) = {
+ (edge.getTarget, edge.getValue)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
new file mode 100644
index 0000000..98ee8b6
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.scala.utils.VertexToTuple2Map
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class JoinWithVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+ private var expectedResult: String = null
+
+ @Test
+ @throws(classOf[Exception])
+ def testJoinWithVertexSet {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val result: Graph[Long, Long, Long] = graph.joinWithVertices(graph.getVertices.map(new
+ VertexToTuple2Map[Long, Long]), new AddValuesMapper)
+ val res = result.getVertices.collect().toList
+ expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testJoinWithVertexSetSugar {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val tupleSet = graph.getVertices.map(new VertexToTuple2Map[Long, Long])
+ val result: Graph[Long, Long, Long] = graph.joinWithVertices[Long](tupleSet,
+ (originalvalue: Long, tuplevalue: Long) => originalvalue + tuplevalue)
+ val res = result.getVertices.collect().toList
+ expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+
+ final class AddValuesMapper extends MapFunction[(Long, Long), Long] {
+ @throws(classOf[Exception])
+ def map(tuple: (Long, Long)): Long = {
+ tuple._1 + tuple._2
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
new file mode 100644
index 0000000..bdfd569
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.Edge
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class MapEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+ private var expectedResult: String = null
+
+ @Test
+ @throws(classOf[Exception])
+ def testWithSameValue {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.mapEdges(new AddOneMapper).getEdges.collect().toList
+ expectedResult = "1,2,13\n" +
+ "1,3,14\n" + "" +
+ "2,3,24\n" +
+ "3,4,35\n" +
+ "3,5,36\n" +
+ "4,5,46\n" +
+ "5,1,52\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ @Test
+ @throws(classOf[Exception])
+ def testWithSameValueSugar {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+ .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+ val res = graph.mapEdges(edge => edge.getValue + 1)
+ .getEdges.collect().toList
+ expectedResult = "1,2,13\n" +
+ "1,3,14\n" + "" +
+ "2,3,24\n" +
+ "3,4,35\n" +
+ "3,5,36\n" +
+ "4,5,46\n" +
+ "5,1,52\n"
+ TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+ }
+
+ final class AddOneMapper extends MapFunction[Edge[Long, Long], Long] {
+ @throws(classOf[Exception])
+ def map(edge: Edge[Long, Long]): Long = {
+ edge.getValue + 1
+ }
+ }
+
+}