You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:06:01 UTC

[23/24] flink git commit: [FLINK-2833] [gelly] create a flink-libraries module and move gelly there

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
new file mode 100644
index 0000000..2dc272c
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.example;
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.Edge
+import org.apache.flink.api.common.functions.MapFunction
+import scala.collection.JavaConversions._
+import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
+import org.apache.flink.graph.gsa.GatherFunction
+import org.apache.flink.graph.gsa.Neighbor
+import org.apache.flink.graph.gsa.SumFunction
+import org.apache.flink.graph.gsa.ApplyFunction
+
+/**
+ * This example shows how to use Gelly's gather-sum-apply iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
+ */
+object GSASingleSourceShortestPaths {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
+    val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
+
+    // Execute the gather-sum-apply iteration
+    val result = graph.runGatherSumApplyIteration(new CalculateDistances, new ChooseMinDistance,
+      new UpdateDistance, maxIterations)
+
+    // Extract the vertices as the result
+    val singleSourceShortestPaths = result.getVertices
+
+    // emit result
+    if (fileOutput) {
+      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
+      env.execute("GSA Single Source Shortest Paths Example")
+    } else {
+      singleSourceShortestPaths.print()
+    }
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Single Source Shortest Path UDFs
+  // --------------------------------------------------------------------------------------------
+
+  private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
+
+    override def map(id: Long) = {
+      if (id.equals(srcId)) {
+        0.0
+      } else {
+        Double.PositiveInfinity
+      }
+    }
+  }
+
+  private final class CalculateDistances extends GatherFunction[Double, Double, Double] {
+    override def gather(neighbor: Neighbor[Double, Double]) = {
+      neighbor.getNeighborValue + neighbor.getEdgeValue
+    }
+  }
+
+  private final class ChooseMinDistance extends SumFunction[Double, Double, Double] {
+    override def sum(newValue: Double, currentValue: Double) = {
+      Math.min(newValue, currentValue)
+    }
+  }
+
+  private final class UpdateDistance extends ApplyFunction[Long, Double, Double] {
+    override def apply(newDistance: Double, oldDistance: Double) = {
+      if (newDistance < oldDistance) {
+        setResult(newDistance)
+      }
+    }
+  }
+
+  // **************************************************************************
+  // UTIL METHODS
+  // **************************************************************************
+
+  private var fileOutput = false
+  private var srcVertexId = 1L
+  private var edgesInputPath: String = null
+  private var outputPath: String = null
+  private var maxIterations = 5
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if(args.length > 0) {
+      if(args.length != 4) {
+        System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+          " <input edges path> <output path> <num iterations>")
+        false
+      }
+      fileOutput = true
+      srcVertexId = args(0).toLong
+      edgesInputPath = args(1)
+      outputPath = args(2)
+      maxIterations = (3).toInt
+    } else {
+      System.out.println("Executing Single Source Shortest Paths example "
+        + "with default parameters and built-in default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of input files.")
+      System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+        " <input edges path> <output path> <num iterations>");
+    }
+    true
+  }
+
+  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = {
+    if (fileOutput) {
+      env.readCsvFile[(Long, Long, Double)](edgesInputPath,
+        lineDelimiter = "\n",
+        fieldDelimiter = "\t")
+        .map(new Tuple3ToEdgeMap[Long, Double]())
+    } else {
+      val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
+        case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
+          z.asInstanceOf[Double])
+      }
+      env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
new file mode 100644
index 0000000..4eed824
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.graph.scala.example
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.Edge
+import org.apache.flink.util.Collector
+
+/**
+ * This example illustrates how to use Gelly metrics methods and get simple statistics
+ * from the input graph.  
+ * 
+ * The program creates a random graph and computes and prints
+ * the following metrics:
+ * - number of vertices
+ * - number of edges
+ * - average node degree
+ * - the vertex ids with the max/min in- and out-degrees
+ *
+ * The input file is expected to contain one edge per line,
+ * with long IDs and no values, in the following format:
+ * {{{
+ *   <sourceVertexID>\t<targetVertexID>
+ * }}}
+ * If no arguments are provided, the example runs with a random graph of 100 vertices.
+ *
+ */
+object GraphMetrics {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    /** create the graph **/
+    val graph: Graph[Long, NullValue, NullValue] = Graph.fromDataSet(getEdgeDataSet(env), env)
+
+    /** get the number of vertices **/
+    val numVertices = graph.numberOfVertices;
+
+    /** get the number of edges **/
+    val numEdges = graph.numberOfEdges;
+
+    /** compute the average node degree **/
+    val verticesWithDegrees = graph.getDegrees;
+    val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble)
+
+    /** find the vertex with the maximum in-degree **/
+    val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1)
+
+    /** find the vertex with the minimum in-degree **/
+    val minInDegreeVertex = graph.inDegrees.min(1).map(in => in._1)
+
+    /** find the vertex with the maximum out-degree **/
+    val maxOutDegreeVertex = graph.outDegrees.max(1).map(in => in._1)
+
+    /** find the vertex with the minimum out-degree **/
+    val minOutDegreeVertex = graph.outDegrees.min(1).map(in => in._1)
+
+    /** print the results **/
+    env.fromElements(numVertices).printOnTaskManager("Total number of vertices")
+    env.fromElements(numEdges).printOnTaskManager("Total number of edges")
+    avgDegree.printOnTaskManager("Average node degree")
+    maxInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
+    minInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
+    maxOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
+    minOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
+
+  }
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      fileOutput = true
+      if (args.length == 1) {
+        edgesPath = args(0)
+        true
+      } else {
+        System.err.println("Usage: GraphMetrics <edges path>")
+        false
+      }
+    } else {
+      System.out.println("Executing GraphMetrics example with built-in default data.")
+      System.out.println("  Provide parameters to read input data from a file.")
+      System.out.println("  Usage: GraphMetrics <edges path>")
+      true
+    }
+  }
+
+  private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
+    if (fileOutput) {
+      env.readCsvFile[(Long, Long)](
+        edgesPath,
+        fieldDelimiter = "\t").map(
+        in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance()))
+    } else {
+      env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]](
+        (key: Long, out: Collector[Edge[Long, NullValue]]) => {
+          val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
+          for ( i <- 0 to numOutEdges ) {
+            var target: Long = ((Math.random() * numVertices) + 1).toLong
+            new Edge[Long, NullValue](key, target, NullValue.getInstance())
+          }
+      })
+    }
+  }
+
+  private var fileOutput: Boolean = false
+  private var edgesPath: String = null
+  private var outputPath: String = null
+  private val numVertices = 100
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
new file mode 100644
index 0000000..65a8e7f
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.example;
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.Edge
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.spargel.VertexUpdateFunction
+import org.apache.flink.graph.spargel.MessageIterator
+import org.apache.flink.graph.Vertex
+import org.apache.flink.graph.spargel.MessagingFunction
+import scala.collection.JavaConversions._
+import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
+
+/**
+ * This example shows how to use Gelly's vertex-centric iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
+ */
+object SingleSourceShortestPaths {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
+    val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
+
+    // Execute the vertex-centric iteration
+    val result = graph.runVertexCentricIteration(new VertexDistanceUpdater,
+      new MinDistanceMessenger, maxIterations)
+
+    // Extract the vertices as the result
+    val singleSourceShortestPaths = result.getVertices
+
+    // emit result
+    if (fileOutput) {
+      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
+      env.execute("Single Source Shortest Paths Example")
+    } else {
+      singleSourceShortestPaths.print()
+    }
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Single Source Shortest Path UDFs
+  // --------------------------------------------------------------------------------------------
+
+  private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
+
+    override def map(id: Long) = {
+      if (id.equals(srcId)) {
+        0.0
+      } else {
+        Double.PositiveInfinity
+      }
+    }
+  }
+
+  /**
+   * Function that updates the value of a vertex by picking the minimum
+   * distance from all incoming messages.
+   */
+  private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Double] {
+
+    override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) {
+      var minDistance = Double.MaxValue
+      while (inMessages.hasNext) {
+        var msg = inMessages.next
+        if (msg < minDistance) {
+          minDistance = msg
+        }
+      }
+      if (vertex.getValue > minDistance) {
+        setNewVertexValue(minDistance)
+      }
+    }
+  }
+
+  /**
+   * Distributes the minimum distance associated with a given vertex among all
+   * the target vertices summed up with the edge's value.
+   */
+  private final class MinDistanceMessenger extends
+    MessagingFunction[Long, Double, Double, Double] {
+
+    override def sendMessages(vertex: Vertex[Long, Double]) {
+      for (edge: Edge[Long, Double] <- getEdges) {
+        sendMessageTo(edge.getTarget(), vertex.getValue + edge.getValue)
+      }
+    }
+  }
+
+  // ****************************************************************************
+  // UTIL METHODS
+  // ****************************************************************************
+
+  private var fileOutput = false
+  private var srcVertexId = 1L
+  private var edgesInputPath: String = null
+  private var outputPath: String = null
+  private var maxIterations = 5
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if(args.length > 0) {
+      if(args.length != 4) {
+        System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+          " <input edges path> <output path> <num iterations>")
+        false
+      }
+      fileOutput = true
+      srcVertexId = args(0).toLong
+      edgesInputPath = args(1)
+      outputPath = args(2)
+      maxIterations = (3).toInt
+    } else {
+      System.out.println("Executing Single Source Shortest Paths example "
+        + "with default parameters and built-in default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of input files.")
+      System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+        " <input edges path> <output path> <num iterations>");
+    }
+    true
+  }
+
+  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = {
+    if (fileOutput) {
+      env.readCsvFile[(Long, Long, Double)](edgesInputPath,
+        lineDelimiter = "\n",
+        fieldDelimiter = "\t")
+        .map(new Tuple3ToEdgeMap[Long, Double]())
+    } else {
+      val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
+        case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
+          z.asInstanceOf[Double])
+      }
+      env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala
new file mode 100644
index 0000000..159a100
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.graph.{Graph => JGraph}
+
+import _root_.scala.reflect.ClassTag
+
+
+package object scala {
+  private[flink] def wrapGraph[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag,
+  EV: TypeInformation : ClassTag](javagraph: JGraph[K, VV, EV]) = new Graph[K, VV, EV](javagraph)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
new file mode 100644
index 0000000..909dbb4
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.Edge
+
+@SerialVersionUID(1L)
+class EdgeToTuple3Map[K, EV] extends MapFunction[Edge[K, EV], (K, K, EV)] {
+
+  override def map(value: Edge[K, EV]): (K, K, EV) = {
+    (value.getSource, value.getTarget, value.getValue)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
new file mode 100644
index 0000000..fd6b8c5
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.Vertex
+
+@SerialVersionUID(1L)
+class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]] {
+
+  override def map(value: (K, VV)): Vertex[K, VV] = {
+    new Vertex(value._1, value._2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
new file mode 100644
index 0000000..d0e07cc
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.Edge
+
+@SerialVersionUID(1L)
+class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]] {
+
+  override def map(value: (K, K, EV)): Edge[K, EV] = {
+    new Edge(value._1, value._2, value._3)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
new file mode 100644
index 0000000..faf4e10
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.Vertex
+
+@SerialVersionUID(1L)
+class VertexToTuple2Map[K, VV] extends MapFunction[Vertex[K, VV], (K, VV)] {
+
+  override def map(value: Vertex[K, VV]): (K, VV) = {
+    (value.getId, value.getValue)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
new file mode 100644
index 0000000..55faee3
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala
+
+import java.lang.reflect.Method
+import org.apache.flink.graph.scala._
+import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
+import org.apache.flink.graph.{Graph => JavaGraph}
+import scala.language.existentials
+import org.junit.Test
+
+/**
+ * This checks whether the Gelly Scala API is up to feature parity with the Java API.
+ * Implements the {@link ScalaAPICompletenessTest} for Gelly.
+ */
+class GellyScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
+
+  override def isExcludedByName(method: Method): Boolean = {
+    val name = method.getDeclaringClass.getName + "." + method.getName
+    val excludedNames = Seq("org.apache.flink.graph.Graph.getContext")
+    excludedNames.contains(name)
+  }
+
+  @Test
+  override def testCompleteness(): Unit = {
+    checkMethods("Graph", "Graph", classOf[JavaGraph[_, _, _]], classOf[Graph[_, _, _]])
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
new file mode 100644
index 0000000..1c2cf54
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.{Edge, Vertex}
+
+object TestGraphUtils {
+
+    def getLongLongVertexData(env: ExecutionEnvironment): DataSet[Vertex[Long, Long]] = {
+        return env.fromCollection(getLongLongVertices)
+    }
+
+    def getLongLongEdgeData(env: ExecutionEnvironment): DataSet[Edge[Long, Long]] = {
+        return env.fromCollection(getLongLongEdges)
+    }
+
+    def getLongLongVertices: List[Vertex[Long, Long]] = {
+        List(
+            new Vertex[Long, Long](1L, 1L),
+            new Vertex[Long, Long](2L, 2L),
+            new Vertex[Long, Long](3L, 3L),
+            new Vertex[Long, Long](4L, 4L),
+            new Vertex[Long, Long](5L, 5L)
+        )
+    }
+
+    def getLongLongEdges: List[Edge[Long, Long]] = {
+        List(
+            new Edge[Long, Long](1L, 2L, 12L),
+            new Edge[Long, Long](1L, 3L, 13L),
+            new Edge[Long, Long](2L, 3L, 23L),
+            new Edge[Long, Long](3L, 4L, 34L),
+            new Edge[Long, Long](3L, 5L, 35L),
+            new Edge[Long, Long](4L, 5L, 45L),
+            new Edge[Long, Long](5L, 1L, 51L)
+        )
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
new file mode 100644
index 0000000..b347049
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class DegreesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+  private var expectedResult: String = null
+
+  @Test
+  @throws(classOf[Exception])
+  def testInDegrees {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.inDegrees.collect().toList
+    expectedResult = "(1,1)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,2)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testOutDegrees {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.outDegrees.collect().toList
+    expectedResult = "(1,2)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,1)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testGetDegrees {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.getDegrees.collect().toList
+    expectedResult = "(1,3)\n" + "(2,2)\n" + "(3,4)\n" + "(4,2)\n" + "(5,3)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
new file mode 100644
index 0000000..6ceaf16
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+import java.io.IOException
+import org.apache.flink.core.fs.FileInputSplit
+import java.io.File
+import java.io.OutputStreamWriter
+import java.io.FileOutputStream
+import java.io.FileOutputStream
+import com.google.common.base.Charsets
+import org.apache.flink.core.fs.Path
+import org.apache.flink.types.NullValue
+import org.apache.flink.api.common.functions.MapFunction
+
+@RunWith(classOf[Parameterized])
+class GraphCreationWithCsvITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+  private var expectedResult: String = null
+
+  @Test
+  @throws(classOf[Exception])
+  def testCsvWithValues {
+    /*
+     * Test with two Csv files, both vertices and edges have values
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val verticesContent =  "1,1\n2,2\n3,3\n"
+    val verticesSplit = createTempFile(verticesContent)
+    val edgesContent =  "1,2,ot\n3,2,tt\n3,1,to\n"
+    val edgesSplit = createTempFile(edgesContent)
+    val graph = Graph.fromCsvReader[Long, Long, String](
+        readVertices = true,
+        pathVertices = verticesSplit.getPath.toString,
+        pathEdges = edgesSplit.getPath.toString,
+        env = env)
+    
+    val result = graph.getTriplets.collect()
+    expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testCsvNoEdgeValues {
+    /*
+     * Test with two Csv files; edges have no values
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val verticesContent =  "1,one\n2,two\n3,three\n"
+    val verticesSplit = createTempFile(verticesContent)
+    val edgesContent =  "1,2\n3,2\n3,1\n"
+    val edgesSplit = createTempFile(edgesContent)
+    val graph = Graph.fromCsvReader[Long, String, NullValue](
+        readVertices = true,
+        pathVertices = verticesSplit.getPath.toString,
+        pathEdges = edgesSplit.getPath.toString,
+        hasEdgeValues = false,
+        env = env)
+    
+    val result = graph.getTriplets.collect()
+    expectedResult = "1,2,one,two,(null)\n3,2,three,two,(null)\n3,1,three,one,(null)\n"
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testCsvWithMapperValues {
+    /*
+     * Test with edges Csv file and vertex mapper initializer
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edgesContent =  "1,2,12\n3,2,32\n3,1,31\n"
+    val edgesSplit = createTempFile(edgesContent)
+    val graph = Graph.fromCsvReader[Long, Double, Long](
+        readVertices = false,
+        pathEdges = edgesSplit.getPath.toString,
+        mapper = new VertexDoubleIdAssigner(),
+        env = env)
+    
+    val result = graph.getTriplets.collect()
+    expectedResult = "1,2,1.0,2.0,12\n3,2,3.0,2.0,32\n3,1,3.0,1.0,31\n"
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testCsvNoVertexValues {
+    /*
+     * Test with edges Csv file: no vertex values
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edgesContent =  "1,2,12\n3,2,32\n3,1,31\n"
+    val edgesSplit = createTempFile(edgesContent)
+    val graph = Graph.fromCsvReader[Long, NullValue, Long](
+        readVertices = false,
+        pathEdges = edgesSplit.getPath.toString,
+        env = env)
+    
+    val result = graph.getTriplets.collect()
+    expectedResult = "1,2,(null),(null),12\n3,2,(null),(null),32\n" +
+      "3,1,(null),(null),31\n"
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testCsvNoValues {
+    /*
+     * Test with edges Csv file: neither vertex nor edge values
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edgesContent =  "1,2\n3,2\n3,1\n"
+    val edgesSplit = createTempFile(edgesContent)
+    val graph = Graph.fromCsvReader[Long, NullValue, NullValue](
+        readVertices = false,
+        pathEdges = edgesSplit.getPath.toString,
+        hasEdgeValues = false,
+        env = env)
+    
+    val result = graph.getTriplets.collect()
+    expectedResult = "1,2,(null),(null),(null)\n" +
+      "3,2,(null),(null),(null)\n3,1,(null),(null),(null)\n"
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testCsvOptionsVertices {
+    /*
+     * Test the options for vertices: delimiters, comments, ignore first line.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val verticesContent =  "42#42\t" + "%this-is-a-comment\t" +
+      "1#1\t" + "2#2\t" + "3#3\t"
+    val verticesSplit = createTempFile(verticesContent)
+    val edgesContent =  "1,2,ot\n3,2,tt\n3,1,to\n"
+    val edgesSplit = createTempFile(edgesContent)
+    val graph = Graph.fromCsvReader[Long, Long, String](
+        readVertices = true,
+        pathVertices = verticesSplit.getPath.toString,
+        lineDelimiterVertices = "\t",
+        fieldDelimiterVertices = "#",
+        ignoreFirstLineVertices = true,
+        ignoreCommentsVertices = "%",
+        pathEdges = edgesSplit.getPath.toString,
+        env = env)
+    
+    val result = graph.getTriplets.collect()
+    expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testCsvOptionsEdges {
+    /*
+     * Test the options for edges: delimiters, comments, ignore first line.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val verticesContent =  "1,1\n2,2\n3,3\n"
+    val verticesSplit = createTempFile(verticesContent)
+    val edgesContent =  "42#42#ignore&" + "1#2#ot&" + "3#2#tt&" + "3#1#to&" +
+      "//this-is-a-comment"
+    val edgesSplit = createTempFile(edgesContent)
+    val graph = Graph.fromCsvReader[Long, Long, String](
+        pathVertices = verticesSplit.getPath.toString,
+        readVertices = true,
+        lineDelimiterEdges = "&",
+        fieldDelimiterEdges = "#",
+        ignoreFirstLineEdges = true,
+        ignoreCommentsEdges = "//",
+        pathEdges = edgesSplit.getPath.toString,
+        env = env)
+    
+    val result = graph.getTriplets.collect()
+    expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+  }
+
+  @throws(classOf[IOException])
+  def createTempFile(content: String): FileInputSplit = {
+    val tempFile = File.createTempFile("test_contents", "tmp")
+    tempFile.deleteOnExit()
+
+    val wrt = new OutputStreamWriter(new FileOutputStream(tempFile), Charsets.UTF_8)
+    wrt.write(content)
+    wrt.close()
+
+    new FileInputSplit(0, new Path(tempFile.toURI.toString), 0, tempFile.length,
+        Array("localhost"));
+    }
+
+    final class VertexDoubleIdAssigner extends MapFunction[Long, Double] {
+      @throws(classOf[Exception])
+      def map(id: Long): Double = {id.toDouble}
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
new file mode 100644
index 0000000..4b776e2
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.{Edge, Vertex}
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class GraphMutationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+  private var expectedResult: String = null
+
+  @Test
+  @throws(classOf[Exception])
+  def testAddVertex {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+
+    val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L))
+    val res = newgraph.getVertices.collect().toList
+    expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAddVertexExisting {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.addVertex(new Vertex[Long, Long](1L, 1L))
+    val res = newgraph.getVertices.collect().toList
+    expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAddVertexNoEdges {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L))
+    val res = newgraph.getVertices.collect().toList
+    expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAddVertices {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+
+    val newgraph = graph.addVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](6L, 6L),
+        new Vertex[Long, Long](7L, 7L)))
+    val res = newgraph.getVertices.collect().toList
+    expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n" + "7,7\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAddVerticesExisting {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+
+    val newgraph = graph.addVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](5L, 5L),
+        new Vertex[Long, Long](6L, 6L)))
+    val res = newgraph.getVertices.collect().toList
+    expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testRemoveVertex {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.removeVertex(new Vertex[Long, Long](5L, 5L))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testRemoveInvalidVertex {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.removeVertex(new Vertex[Long, Long](6L, 6L))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
+      "45\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testRemoveVertices {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L),
+        new Vertex[Long, Long](2L, 2L)))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testRemoveValidAndInvalidVertex {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L),
+        new Vertex[Long, Long](6L, 6L)))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAddEdge {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.addEdge(new Vertex[Long, Long](6L, 6L), new Vertex[Long, Long](1L,
+      1L), 61L)
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
+      "45\n" + "5,1,51\n" + "6,1,61\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAddEdges {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.addEdges(List[Edge[Long, Long]](new Edge(2L, 4L, 24L),
+       new Edge(4L, 1L, 41L), new Edge(4L, 3L, 43L)))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "2,4,24\n" + "3,4,34\n" + "3,5," +
+    "35\n" + "4,1,41\n" + "4,3,43\n" + "4,5,45\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAddEdgesInvalidVertices {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.addEdges(List[Edge[Long, Long]](new Edge(6L, 1L, 61L),
+       new Edge(7L, 8L, 78L)))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," +
+    "35\n" + "4,5,45\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAddExistingEdge {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.addEdge(new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](2L,
+      2L), 12L)
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "1,2,12\n" + "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," +
+      "35\n" + "4,5,45\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testRemoveEdge {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.removeEdge(new Edge[Long, Long](5L, 1L, 51L))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testRemoveInvalidEdge {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.removeEdge(new Edge[Long, Long](6L, 1L, 61L))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
+      "45\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testRemoveEdges {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.removeEdges(List[Edge[Long, Long]](new Edge(1L, 2L, 12L),
+      new Edge(4L, 5L, 45L)))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testRemoveSameEdgeTwiceEdges {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.removeEdges(List[Edge[Long, Long]](new Edge(1L, 2L, 12L),
+       new Edge(1L, 2L, 12L)))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
new file mode 100644
index 0000000..7f7ebc0
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.{Edge, Vertex}
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class GraphOperationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+  private var expectedResult: String = null
+    
+  @Test
+  @throws(classOf[Exception])
+  def testUndirected {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.getUndirected.getEdges.collect().toList;
+
+    expectedResult = "1,2,12\n" + "2,1,12\n" + "1,3,13\n" + "3,1,13\n" + "2,3,23\n" + "3,2," +
+      "23\n" + "3,4,34\n" + "4,3,34\n" + "3,5,35\n" + "5,3,35\n" + "4,5,45\n" + "5,4,45\n" +
+      "5,1,51\n" + "1,5,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testReverse {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.reverse().getEdges.collect().toList;
+
+    expectedResult = "2,1,12\n" + "3,1,13\n" + "3,2,23\n" + "4,3,34\n" + "5,3,35\n" + "5,4," +
+      "45\n" + "1,5,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSubGraph {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.subgraph(new FilterFunction[Vertex[Long, Long]] {
+      @throws(classOf[Exception])
+      def filter(vertex: Vertex[Long, Long]): Boolean = {
+        return (vertex.getValue > 2)
+      }
+    }, new FilterFunction[Edge[Long, Long]] {
+
+      @throws(classOf[Exception])
+      override def filter(edge: Edge[Long, Long]): Boolean = {
+        return (edge.getValue > 34)
+      }
+    }).getEdges.collect().toList;
+
+    expectedResult = "3,5,35\n" + "4,5,45\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSubGraphSugar {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.subgraph(
+      vertex => vertex.getValue > 2,
+      edge => edge.getValue > 34
+    ).getEdges.collect().toList;
+
+    expectedResult = "3,5,35\n" + "4,5,45\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testFilterOnVertices {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.filterOnVertices(new FilterFunction[Vertex[Long, Long]] {
+      @throws(classOf[Exception])
+      def filter(vertex: Vertex[Long, Long]): Boolean = {
+        vertex.getValue > 2
+      }
+    }).getEdges.collect().toList;
+
+    expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testFilterOnVerticesSugar {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.filterOnVertices(
+      vertex => vertex.getValue > 2
+    ).getEdges.collect().toList;
+
+    expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testFilterOnEdges {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.filterOnEdges(new FilterFunction[Edge[Long, Long]] {
+      @throws(classOf[Exception])
+      def filter(edge: Edge[Long, Long]): Boolean = {
+        edge.getValue > 34
+      }
+    }).getEdges.collect().toList;
+
+    expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testFilterOnEdgesSugar {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.filterOnEdges(
+      edge => edge.getValue > 34
+    ).getEdges.collect().toList;
+
+    expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testNumberOfVertices {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = env.fromElements(graph.numberOfVertices).collect().toList
+    expectedResult = "5"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testNumberOfEdges {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = env.fromElements(graph.numberOfEdges).collect().toList
+    expectedResult = "7"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testVertexIds {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.getVertexIds.collect().toList
+    expectedResult = "1\n2\n3\n4\n5\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testEdgesIds {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.getEdgeIds.collect().toList
+    expectedResult = "(1,2)\n" + "(1,3)\n" + "(2,3)\n" + "(3,4)\n" + "(3,5)\n" + "(4,5)\n" +
+      "(5,1)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testUnion {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]](
+      new Vertex[Long, Long](6L, 6L)
+    )
+    val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]](
+      new Edge[Long, Long](6L, 1L, 61L)
+    )
+
+    val newgraph = graph.union(Graph.fromCollection(vertices, edges, env))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
+      "45\n" + "5,1,51\n" + "6,1,61\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testDifference {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]](
+      new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](3L, 3L),
+      new Vertex[Long, Long](6L, 6L) 
+    )
+    val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]](
+      new Edge[Long, Long](1L, 3L, 13L), new Edge[Long, Long](1L, 6L, 16L),
+      new Edge[Long, Long](6L, 3L, 63L)
+    )
+
+    val newgraph = graph.difference(Graph.fromCollection(vertices, edges, env))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "4,5,45\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testDifferenceNoCommonVertices {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]](
+      new Vertex[Long, Long](6L, 6L) 
+    )
+    val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]](
+      new Edge[Long, Long](6L, 6L, 66L)
+    )
+
+    val newgraph = graph.difference(Graph.fromCollection(vertices, edges, env))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
+      "45\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testTriplets {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.getTriplets.collect().toList
+    expectedResult = "1,2,1,2,12\n" + "1,3,1,3,13\n" + "2,3,2,3,23\n" + "3,4,3,4,34\n" +
+      "3,5,3,5,35\n" + "4,5,4,5,45\n" + "5,1,5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
new file mode 100644
index 0000000..3dc90fc
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.Edge
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.scala.utils.EdgeToTuple3Map
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class JoinWithEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+  private var expectedResult: String = null
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithEdgesInputDataset {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new
+        EdgeToTuple3Map[Long, Long]), new AddValuesMapper)
+    val res = result.getEdges.collect().toList
+    expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
+      "90\n" + "5,1,102\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithEdgesInputDatasetSugar {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new
+        EdgeToTuple3Map[Long, Long]), (originalValue: Long, tupleValue: Long) =>
+      originalValue + tupleValue)
+    val res = result.getEdges.collect().toList
+    expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
+      "90\n" + "5,1,102\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithEdgesOnSource {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges
+      .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) =>
+      originalValue + tupleValue)
+    val res = result.getEdges.collect().toList
+    expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," +
+      "90\n" + "5,1,102\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithEdgesOnSourceSugar {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges
+      .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) =>
+      originalValue + tupleValue)
+    val res = result.getEdges.collect().toList
+    expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," +
+      "90\n" + "5,1,102\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithEdgesOnTarget {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges
+      .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) =>
+      originalValue + tupleValue)
+    val res = result.getEdges.collect().toList
+    expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
+      "80\n" + "5,1,102\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithEdgesOnTargetSugar {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges
+      .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) =>
+      originalValue + tupleValue)
+    val res = result.getEdges.collect().toList
+    expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
+      "80\n" + "5,1,102\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+
+  final class AddValuesMapper extends MapFunction[(Long, Long), Long] {
+    @throws(classOf[Exception])
+    def map(tuple: (Long, Long)): Long = {
+      tuple._1 + tuple._2
+    }
+  }
+
+  final class ProjectSourceAndValueMapper extends MapFunction[Edge[Long, Long], (Long, Long)] {
+    @throws(classOf[Exception])
+    def map(edge: Edge[Long, Long]): (Long, Long) = {
+      (edge.getSource, edge.getValue)
+    }
+  }
+
+  final class ProjectTargetAndValueMapper extends MapFunction[Edge[Long, Long], (Long, Long)] {
+    @throws(classOf[Exception])
+    def map(edge: Edge[Long, Long]): (Long, Long) = {
+      (edge.getTarget, edge.getValue)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
new file mode 100644
index 0000000..98ee8b6
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.scala.utils.VertexToTuple2Map
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class JoinWithVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+  private var expectedResult: String = null
+
+  @Test
+  @throws(classOf[Exception])
+  def testJoinWithVertexSet {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result: Graph[Long, Long, Long] = graph.joinWithVertices(graph.getVertices.map(new
+        VertexToTuple2Map[Long, Long]), new AddValuesMapper)
+    val res = result.getVertices.collect().toList
+    expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testJoinWithVertexSetSugar {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val tupleSet = graph.getVertices.map(new VertexToTuple2Map[Long, Long])
+    val result: Graph[Long, Long, Long] = graph.joinWithVertices[Long](tupleSet,
+      (originalvalue: Long, tuplevalue: Long) => originalvalue + tuplevalue)
+    val res = result.getVertices.collect().toList
+    expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+
+  final class AddValuesMapper extends MapFunction[(Long, Long), Long] {
+    @throws(classOf[Exception])
+    def map(tuple: (Long, Long)): Long = {
+      tuple._1 + tuple._2
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
new file mode 100644
index 0000000..bdfd569
--- /dev/null
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.Edge
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class MapEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+  private var expectedResult: String = null
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithSameValue {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.mapEdges(new AddOneMapper).getEdges.collect().toList
+    expectedResult = "1,2,13\n" +
+      "1,3,14\n" + "" +
+      "2,3,24\n" +
+      "3,4,35\n" +
+      "3,5,36\n" +
+      "4,5,46\n" +
+      "5,1,52\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithSameValueSugar {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.mapEdges(edge => edge.getValue + 1)
+      .getEdges.collect().toList
+    expectedResult = "1,2,13\n" +
+      "1,3,14\n" + "" +
+      "2,3,24\n" +
+      "3,4,35\n" +
+      "3,5,36\n" +
+      "4,5,46\n" +
+      "5,1,52\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  final class AddOneMapper extends MapFunction[Edge[Long, Long], Long] {
+    @throws(classOf[Exception])
+    def map(edge: Edge[Long, Long]): Long = {
+      edge.getValue + 1
+    }
+  }
+
+}