You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:50 UTC

[12/24] flink git commit: [FLINK-2833] [gelly] create a flink-libraries module and move gelly there

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala
deleted file mode 100644
index ca15dab..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala
-
-import org.apache.flink.api.java.tuple.Tuple3
-import org.apache.flink.graph.{Edge, Vertex}
-import org.apache.flink.util.Collector
-
-
-abstract class NeighborsFunction[K, VV, EV, T] extends org.apache.flink.graph
-.NeighborsFunction[K, VV, EV, T] {
-
-  def iterateNeighbors(neighbors: Iterable[(K, Edge[K, EV], Vertex[K, VV])], out: Collector[T])
-
-  override def iterateNeighbors(neighbors: java.lang.Iterable[Tuple3[K, Edge[K, EV], Vertex[K,
-    VV]]], out: Collector[T]) = {
-    val scalaIterable = scala.collection.JavaConversions.iterableAsScalaIterable(neighbors)
-      .map(jtuple => (jtuple.f0, jtuple.f1, jtuple.f2))
-    iterateNeighbors(scalaIterable, out)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala
deleted file mode 100644
index cefc277..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala
-
-import java.lang
-
-import org.apache.flink.api.java.tuple.Tuple2
-import org.apache.flink.graph.{Edge, Vertex}
-import org.apache.flink.util.Collector
-
-
-abstract class NeighborsFunctionWithVertexValue[K, VV, EV, T] extends org.apache.flink.graph
-.NeighborsFunctionWithVertexValue[K, VV, EV, T] {
-
-  def iterateNeighbors(vertex: Vertex[K, VV], neighbors: Iterable[(Edge[K, EV], Vertex[K, VV])
-    ], out: Collector[T]): Unit
-
-  override def iterateNeighbors(vertex: Vertex[K, VV], neighbors: lang.Iterable[Tuple2[Edge[K,
-    EV], Vertex[K, VV]]], out: Collector[T]): Unit = {
-    val scalaIterable = scala.collection.JavaConversions.iterableAsScalaIterable(neighbors)
-      .map(jtuple => (jtuple.f0, jtuple.f1))
-    iterateNeighbors(vertex, scalaIterable, out)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
deleted file mode 100644
index b3da520..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.example;
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.Edge
-import org.apache.flink.types.NullValue
-import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.graph.library.GSAConnectedComponents
-import java.lang.Long
-
-/**
- * This example shows how to use Gelly's library methods.
- * You can find all available library methods in [[org.apache.flink.graph.library]]. 
- * 
- * In particular, this example uses the
- * [[org.apache.flink.graph.library.ConnectedComponentsAlgorithm.GSAConnectedComponents]]
- * library method to compute the connected components of the input graph.
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\n1\t3\n</code> defines two edges,
- * 1-2 and 1-3.
- *
- * Usage {{
- *   ConnectedComponents <edge path> <result path> <number of iterations>
- *   }}
- * If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData]]
- */
-object ConnectedComponents {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env)
-    val graph = Graph.fromDataSet[Long, Long, NullValue](edges, new InitVertices, env)
-
-    val components = graph.run(new GSAConnectedComponents[Long, NullValue](maxIterations))
-
-
-    // emit result
-    if (fileOutput) {
-      components.writeAsCsv(outputPath, "\n", ",")
-      env.execute("Connected Components Example")
-    } else {
-      components.print()
-    }
-  }
-
-  private final class InitVertices extends MapFunction[Long, Long] {
-    override def map(id: Long) = {id}
-  }
-
-  // ***********************************************************************
-  // UTIL METHODS
-  // ***********************************************************************
-
-    private var fileOutput = false
-    private var edgesInputPath: String = null
-    private var outputPath: String = null
-    private var maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS
-
-    private def parseParameters(args: Array[String]): Boolean = {
-      if(args.length > 0) {
-        if(args.length != 3) {
-          System.err.println("Usage ConnectedComponents <edge path> <output path> " +
-            "<num iterations>")
-          false
-        }
-        fileOutput = true
-        edgesInputPath = args(0)
-        outputPath = args(1)
-        maxIterations = (2).toInt
-      } else {
-        System.out.println("Executing ConnectedComponents example with default parameters" +
-          " and built-in default data.")
-        System.out.println("  Provide parameters to read input data from files.")
-        System.out.println("  See the documentation for the correct format of input files.")
-        System.out.println("Usage ConnectedComponents <edge path> <output path> " +
-          "<num iterations>");
-      }
-      true
-    }
-
-    private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
-      if (fileOutput) {
-        env.readCsvFile[(Long, Long)](edgesInputPath,
-          lineDelimiter = "\n",
-          fieldDelimiter = "\t")
-          .map(edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance))
-      } else {
-        val edgeData = ConnectedComponentsDefaultData.DEFAULT_EDGES map {
-          case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
-        }
-        env.fromCollection(edgeData).map(
-        edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance))
-      }
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
deleted file mode 100644
index 2dc272c..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.example;
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.types.NullValue
-import org.apache.flink.graph.Edge
-import org.apache.flink.api.common.functions.MapFunction
-import scala.collection.JavaConversions._
-import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
-import org.apache.flink.graph.gsa.GatherFunction
-import org.apache.flink.graph.gsa.Neighbor
-import org.apache.flink.graph.gsa.SumFunction
-import org.apache.flink.graph.gsa.ApplyFunction
-
-/**
- * This example shows how to use Gelly's gather-sum-apply iterations.
- * 
- * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
- * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
- *
- * If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
- */
-object GSASingleSourceShortestPaths {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
-    val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
-
-    // Execute the gather-sum-apply iteration
-    val result = graph.runGatherSumApplyIteration(new CalculateDistances, new ChooseMinDistance,
-      new UpdateDistance, maxIterations)
-
-    // Extract the vertices as the result
-    val singleSourceShortestPaths = result.getVertices
-
-    // emit result
-    if (fileOutput) {
-      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
-      env.execute("GSA Single Source Shortest Paths Example")
-    } else {
-      singleSourceShortestPaths.print()
-    }
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Single Source Shortest Path UDFs
-  // --------------------------------------------------------------------------------------------
-
-  private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
-
-    override def map(id: Long) = {
-      if (id.equals(srcId)) {
-        0.0
-      } else {
-        Double.PositiveInfinity
-      }
-    }
-  }
-
-  private final class CalculateDistances extends GatherFunction[Double, Double, Double] {
-    override def gather(neighbor: Neighbor[Double, Double]) = {
-      neighbor.getNeighborValue + neighbor.getEdgeValue
-    }
-  }
-
-  private final class ChooseMinDistance extends SumFunction[Double, Double, Double] {
-    override def sum(newValue: Double, currentValue: Double) = {
-      Math.min(newValue, currentValue)
-    }
-  }
-
-  private final class UpdateDistance extends ApplyFunction[Long, Double, Double] {
-    override def apply(newDistance: Double, oldDistance: Double) = {
-      if (newDistance < oldDistance) {
-        setResult(newDistance)
-      }
-    }
-  }
-
-  // **************************************************************************
-  // UTIL METHODS
-  // **************************************************************************
-
-  private var fileOutput = false
-  private var srcVertexId = 1L
-  private var edgesInputPath: String = null
-  private var outputPath: String = null
-  private var maxIterations = 5
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if(args.length > 0) {
-      if(args.length != 4) {
-        System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
-          " <input edges path> <output path> <num iterations>")
-        false
-      }
-      fileOutput = true
-      srcVertexId = args(0).toLong
-      edgesInputPath = args(1)
-      outputPath = args(2)
-      maxIterations = (3).toInt
-    } else {
-      System.out.println("Executing Single Source Shortest Paths example "
-        + "with default parameters and built-in default data.")
-      System.out.println("  Provide parameters to read input data from files.")
-      System.out.println("  See the documentation for the correct format of input files.")
-      System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
-        " <input edges path> <output path> <num iterations>");
-    }
-    true
-  }
-
-  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = {
-    if (fileOutput) {
-      env.readCsvFile[(Long, Long, Double)](edgesInputPath,
-        lineDelimiter = "\n",
-        fieldDelimiter = "\t")
-        .map(new Tuple3ToEdgeMap[Long, Double]())
-    } else {
-      val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
-        case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
-          z.asInstanceOf[Double])
-      }
-      env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
deleted file mode 100644
index 4eed824..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.graph.scala.example
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.types.NullValue
-import org.apache.flink.graph.Edge
-import org.apache.flink.util.Collector
-
-/**
- * This example illustrates how to use Gelly metrics methods and get simple statistics
- * from the input graph.  
- * 
- * The program creates a random graph and computes and prints
- * the following metrics:
- * - number of vertices
- * - number of edges
- * - average node degree
- * - the vertex ids with the max/min in- and out-degrees
- *
- * The input file is expected to contain one edge per line,
- * with long IDs and no values, in the following format:
- * {{{
- *   <sourceVertexID>\t<targetVertexID>
- * }}}
- * If no arguments are provided, the example runs with a random graph of 100 vertices.
- *
- */
-object GraphMetrics {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    /** create the graph **/
-    val graph: Graph[Long, NullValue, NullValue] = Graph.fromDataSet(getEdgeDataSet(env), env)
-
-    /** get the number of vertices **/
-    val numVertices = graph.numberOfVertices;
-
-    /** get the number of edges **/
-    val numEdges = graph.numberOfEdges;
-
-    /** compute the average node degree **/
-    val verticesWithDegrees = graph.getDegrees;
-    val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble)
-
-    /** find the vertex with the maximum in-degree **/
-    val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1)
-
-    /** find the vertex with the minimum in-degree **/
-    val minInDegreeVertex = graph.inDegrees.min(1).map(in => in._1)
-
-    /** find the vertex with the maximum out-degree **/
-    val maxOutDegreeVertex = graph.outDegrees.max(1).map(in => in._1)
-
-    /** find the vertex with the minimum out-degree **/
-    val minOutDegreeVertex = graph.outDegrees.min(1).map(in => in._1)
-
-    /** print the results **/
-    env.fromElements(numVertices).printOnTaskManager("Total number of vertices")
-    env.fromElements(numEdges).printOnTaskManager("Total number of edges")
-    avgDegree.printOnTaskManager("Average node degree")
-    maxInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
-    minInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
-    maxOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
-    minOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
-
-  }
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      fileOutput = true
-      if (args.length == 1) {
-        edgesPath = args(0)
-        true
-      } else {
-        System.err.println("Usage: GraphMetrics <edges path>")
-        false
-      }
-    } else {
-      System.out.println("Executing GraphMetrics example with built-in default data.")
-      System.out.println("  Provide parameters to read input data from a file.")
-      System.out.println("  Usage: GraphMetrics <edges path>")
-      true
-    }
-  }
-
-  private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
-    if (fileOutput) {
-      env.readCsvFile[(Long, Long)](
-        edgesPath,
-        fieldDelimiter = "\t").map(
-        in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance()))
-    } else {
-      env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]](
-        (key: Long, out: Collector[Edge[Long, NullValue]]) => {
-          val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
-          for ( i <- 0 to numOutEdges ) {
-            var target: Long = ((Math.random() * numVertices) + 1).toLong
-            new Edge[Long, NullValue](key, target, NullValue.getInstance())
-          }
-      })
-    }
-  }
-
-  private var fileOutput: Boolean = false
-  private var edgesPath: String = null
-  private var outputPath: String = null
-  private val numVertices = 100
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
deleted file mode 100644
index 65a8e7f..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.example;
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.types.NullValue
-import org.apache.flink.graph.Edge
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.graph.spargel.VertexUpdateFunction
-import org.apache.flink.graph.spargel.MessageIterator
-import org.apache.flink.graph.Vertex
-import org.apache.flink.graph.spargel.MessagingFunction
-import scala.collection.JavaConversions._
-import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
-
-/**
- * This example shows how to use Gelly's vertex-centric iterations.
- * 
- * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
- * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
- *
- * If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
- */
-object SingleSourceShortestPaths {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
-    val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
-
-    // Execute the vertex-centric iteration
-    val result = graph.runVertexCentricIteration(new VertexDistanceUpdater,
-      new MinDistanceMessenger, maxIterations)
-
-    // Extract the vertices as the result
-    val singleSourceShortestPaths = result.getVertices
-
-    // emit result
-    if (fileOutput) {
-      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
-      env.execute("Single Source Shortest Paths Example")
-    } else {
-      singleSourceShortestPaths.print()
-    }
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Single Source Shortest Path UDFs
-  // --------------------------------------------------------------------------------------------
-
-  private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
-
-    override def map(id: Long) = {
-      if (id.equals(srcId)) {
-        0.0
-      } else {
-        Double.PositiveInfinity
-      }
-    }
-  }
-
-  /**
-   * Function that updates the value of a vertex by picking the minimum
-   * distance from all incoming messages.
-   */
-  private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Double] {
-
-    override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) {
-      var minDistance = Double.MaxValue
-      while (inMessages.hasNext) {
-        var msg = inMessages.next
-        if (msg < minDistance) {
-          minDistance = msg
-        }
-      }
-      if (vertex.getValue > minDistance) {
-        setNewVertexValue(minDistance)
-      }
-    }
-  }
-
-  /**
-   * Distributes the minimum distance associated with a given vertex among all
-   * the target vertices summed up with the edge's value.
-   */
-  private final class MinDistanceMessenger extends
-    MessagingFunction[Long, Double, Double, Double] {
-
-    override def sendMessages(vertex: Vertex[Long, Double]) {
-      for (edge: Edge[Long, Double] <- getEdges) {
-        sendMessageTo(edge.getTarget(), vertex.getValue + edge.getValue)
-      }
-    }
-  }
-
-  // ****************************************************************************
-  // UTIL METHODS
-  // ****************************************************************************
-
-  private var fileOutput = false
-  private var srcVertexId = 1L
-  private var edgesInputPath: String = null
-  private var outputPath: String = null
-  private var maxIterations = 5
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if(args.length > 0) {
-      if(args.length != 4) {
-        System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
-          " <input edges path> <output path> <num iterations>")
-        false
-      }
-      fileOutput = true
-      srcVertexId = args(0).toLong
-      edgesInputPath = args(1)
-      outputPath = args(2)
-      maxIterations = (3).toInt
-    } else {
-      System.out.println("Executing Single Source Shortest Paths example "
-        + "with default parameters and built-in default data.")
-      System.out.println("  Provide parameters to read input data from files.")
-      System.out.println("  See the documentation for the correct format of input files.")
-      System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
-        " <input edges path> <output path> <num iterations>");
-    }
-    true
-  }
-
-  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = {
-    if (fileOutput) {
-      env.readCsvFile[(Long, Long, Double)](edgesInputPath,
-        lineDelimiter = "\n",
-        fieldDelimiter = "\t")
-        .map(new Tuple3ToEdgeMap[Long, Double]())
-    } else {
-      val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
-        case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
-          z.asInstanceOf[Double])
-      }
-      env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala
deleted file mode 100644
index 159a100..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.graph.{Graph => JGraph}
-
-import _root_.scala.reflect.ClassTag
-
-
-package object scala {
-  private[flink] def wrapGraph[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag,
-  EV: TypeInformation : ClassTag](javagraph: JGraph[K, VV, EV]) = new Graph[K, VV, EV](javagraph)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
deleted file mode 100644
index 909dbb4..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.utils
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.graph.Edge
-
-@SerialVersionUID(1L)
-class EdgeToTuple3Map[K, EV] extends MapFunction[Edge[K, EV], (K, K, EV)] {
-
-  override def map(value: Edge[K, EV]): (K, K, EV) = {
-    (value.getSource, value.getTarget, value.getValue)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
deleted file mode 100644
index fd6b8c5..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.utils
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.graph.Vertex
-
-@SerialVersionUID(1L)
-class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]] {
-
-  override def map(value: (K, VV)): Vertex[K, VV] = {
-    new Vertex(value._1, value._2)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
deleted file mode 100644
index d0e07cc..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.utils
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.graph.Edge
-
-@SerialVersionUID(1L)
-class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]] {
-
-  override def map(value: (K, K, EV)): Edge[K, EV] = {
-    new Edge(value._1, value._2, value._3)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
deleted file mode 100644
index faf4e10..0000000
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.utils
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.graph.Vertex
-
-@SerialVersionUID(1L)
-class VertexToTuple2Map[K, VV] extends MapFunction[Vertex[K, VV], (K, VV)] {
-
-  override def map(value: Vertex[K, VV]): (K, VV) = {
-    (value.getId, value.getValue)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
deleted file mode 100644
index 55faee3..0000000
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.scala
-
-import java.lang.reflect.Method
-import org.apache.flink.graph.scala._
-import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
-import org.apache.flink.graph.{Graph => JavaGraph}
-import scala.language.existentials
-import org.junit.Test
-
-/**
- * This checks whether the Gelly Scala API is up to feature parity with the Java API.
- * Implements the {@link ScalaAPICompletenessTest} for Gelly.
- */
-class GellyScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
-
-  override def isExcludedByName(method: Method): Boolean = {
-    val name = method.getDeclaringClass.getName + "." + method.getName
-    val excludedNames = Seq("org.apache.flink.graph.Graph.getContext")
-    excludedNames.contains(name)
-  }
-
-  @Test
-  override def testCompleteness(): Unit = {
-    checkMethods("Graph", "Graph", classOf[JavaGraph[_, _, _]], classOf[Graph[_, _, _]])
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
deleted file mode 100644
index 1c2cf54..0000000
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.{Edge, Vertex}
-
-object TestGraphUtils {
-
-    def getLongLongVertexData(env: ExecutionEnvironment): DataSet[Vertex[Long, Long]] = {
-        return env.fromCollection(getLongLongVertices)
-    }
-
-    def getLongLongEdgeData(env: ExecutionEnvironment): DataSet[Edge[Long, Long]] = {
-        return env.fromCollection(getLongLongEdges)
-    }
-
-    def getLongLongVertices: List[Vertex[Long, Long]] = {
-        List(
-            new Vertex[Long, Long](1L, 1L),
-            new Vertex[Long, Long](2L, 2L),
-            new Vertex[Long, Long](3L, 3L),
-            new Vertex[Long, Long](4L, 4L),
-            new Vertex[Long, Long](5L, 5L)
-        )
-    }
-
-    def getLongLongEdges: List[Edge[Long, Long]] = {
-        List(
-            new Edge[Long, Long](1L, 2L, 12L),
-            new Edge[Long, Long](1L, 3L, 13L),
-            new Edge[Long, Long](2L, 3L, 23L),
-            new Edge[Long, Long](3L, 4L, 34L),
-            new Edge[Long, Long](3L, 5L, 35L),
-            new Edge[Long, Long](4L, 5L, 45L),
-            new Edge[Long, Long](5L, 1L, 51L)
-        )
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
deleted file mode 100644
index b347049..0000000
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.test.operations
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.scala.test.TestGraphUtils
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
-import _root_.scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class DegreesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
-MultipleProgramsTestBase(mode) {
-
-  private var expectedResult: String = null
-
-  @Test
-  @throws(classOf[Exception])
-  def testInDegrees {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.inDegrees.collect().toList
-    expectedResult = "(1,1)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,2)\n"
-    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testOutDegrees {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.outDegrees.collect().toList
-    expectedResult = "(1,2)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,1)\n"
-    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testGetDegrees {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.getDegrees.collect().toList
-    expectedResult = "(1,3)\n" + "(2,2)\n" + "(3,4)\n" + "(4,2)\n" + "(5,3)\n"
-    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
deleted file mode 100644
index 6ceaf16..0000000
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.test.operations
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.scala.test.TestGraphUtils
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
-import _root_.scala.collection.JavaConverters._
-import java.io.IOException
-import org.apache.flink.core.fs.FileInputSplit
-import java.io.File
-import java.io.OutputStreamWriter
-import java.io.FileOutputStream
-import java.io.FileOutputStream
-import com.google.common.base.Charsets
-import org.apache.flink.core.fs.Path
-import org.apache.flink.types.NullValue
-import org.apache.flink.api.common.functions.MapFunction
-
-@RunWith(classOf[Parameterized])
-class GraphCreationWithCsvITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
-MultipleProgramsTestBase(mode) {
-
-  private var expectedResult: String = null
-
-  @Test
-  @throws(classOf[Exception])
-  def testCsvWithValues {
-    /*
-     * Test with two Csv files, both vertices and edges have values
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val verticesContent =  "1,1\n2,2\n3,3\n"
-    val verticesSplit = createTempFile(verticesContent)
-    val edgesContent =  "1,2,ot\n3,2,tt\n3,1,to\n"
-    val edgesSplit = createTempFile(edgesContent)
-    val graph = Graph.fromCsvReader[Long, Long, String](
-        readVertices = true,
-        pathVertices = verticesSplit.getPath.toString,
-        pathEdges = edgesSplit.getPath.toString,
-        env = env)
-    
-    val result = graph.getTriplets.collect()
-    expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
-    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testCsvNoEdgeValues {
-    /*
-     * Test with two Csv files; edges have no values
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val verticesContent =  "1,one\n2,two\n3,three\n"
-    val verticesSplit = createTempFile(verticesContent)
-    val edgesContent =  "1,2\n3,2\n3,1\n"
-    val edgesSplit = createTempFile(edgesContent)
-    val graph = Graph.fromCsvReader[Long, String, NullValue](
-        readVertices = true,
-        pathVertices = verticesSplit.getPath.toString,
-        pathEdges = edgesSplit.getPath.toString,
-        hasEdgeValues = false,
-        env = env)
-    
-    val result = graph.getTriplets.collect()
-    expectedResult = "1,2,one,two,(null)\n3,2,three,two,(null)\n3,1,three,one,(null)\n"
-    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testCsvWithMapperValues {
-    /*
-     * Test with edges Csv file and vertex mapper initializer
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val edgesContent =  "1,2,12\n3,2,32\n3,1,31\n"
-    val edgesSplit = createTempFile(edgesContent)
-    val graph = Graph.fromCsvReader[Long, Double, Long](
-        readVertices = false,
-        pathEdges = edgesSplit.getPath.toString,
-        mapper = new VertexDoubleIdAssigner(),
-        env = env)
-    
-    val result = graph.getTriplets.collect()
-    expectedResult = "1,2,1.0,2.0,12\n3,2,3.0,2.0,32\n3,1,3.0,1.0,31\n"
-    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testCsvNoVertexValues {
-    /*
-     * Test with edges Csv file: no vertex values
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val edgesContent =  "1,2,12\n3,2,32\n3,1,31\n"
-    val edgesSplit = createTempFile(edgesContent)
-    val graph = Graph.fromCsvReader[Long, NullValue, Long](
-        readVertices = false,
-        pathEdges = edgesSplit.getPath.toString,
-        env = env)
-    
-    val result = graph.getTriplets.collect()
-    expectedResult = "1,2,(null),(null),12\n3,2,(null),(null),32\n" +
-      "3,1,(null),(null),31\n"
-    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testCsvNoValues {
-    /*
-     * Test with edges Csv file: neither vertex nor edge values
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val edgesContent =  "1,2\n3,2\n3,1\n"
-    val edgesSplit = createTempFile(edgesContent)
-    val graph = Graph.fromCsvReader[Long, NullValue, NullValue](
-        readVertices = false,
-        pathEdges = edgesSplit.getPath.toString,
-        hasEdgeValues = false,
-        env = env)
-    
-    val result = graph.getTriplets.collect()
-    expectedResult = "1,2,(null),(null),(null)\n" +
-      "3,2,(null),(null),(null)\n3,1,(null),(null),(null)\n"
-    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testCsvOptionsVertices {
-    /*
-     * Test the options for vertices: delimiters, comments, ignore first line.
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val verticesContent =  "42#42\t" + "%this-is-a-comment\t" +
-      "1#1\t" + "2#2\t" + "3#3\t"
-    val verticesSplit = createTempFile(verticesContent)
-    val edgesContent =  "1,2,ot\n3,2,tt\n3,1,to\n"
-    val edgesSplit = createTempFile(edgesContent)
-    val graph = Graph.fromCsvReader[Long, Long, String](
-        readVertices = true,
-        pathVertices = verticesSplit.getPath.toString,
-        lineDelimiterVertices = "\t",
-        fieldDelimiterVertices = "#",
-        ignoreFirstLineVertices = true,
-        ignoreCommentsVertices = "%",
-        pathEdges = edgesSplit.getPath.toString,
-        env = env)
-    
-    val result = graph.getTriplets.collect()
-    expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
-    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testCsvOptionsEdges {
-    /*
-     * Test the options for edges: delimiters, comments, ignore first line.
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val verticesContent =  "1,1\n2,2\n3,3\n"
-    val verticesSplit = createTempFile(verticesContent)
-    val edgesContent =  "42#42#ignore&" + "1#2#ot&" + "3#2#tt&" + "3#1#to&" +
-      "//this-is-a-comment"
-    val edgesSplit = createTempFile(edgesContent)
-    val graph = Graph.fromCsvReader[Long, Long, String](
-        pathVertices = verticesSplit.getPath.toString,
-        readVertices = true,
-        lineDelimiterEdges = "&",
-        fieldDelimiterEdges = "#",
-        ignoreFirstLineEdges = true,
-        ignoreCommentsEdges = "//",
-        pathEdges = edgesSplit.getPath.toString,
-        env = env)
-    
-    val result = graph.getTriplets.collect()
-    expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
-    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
-  }
-
-  @throws(classOf[IOException])
-  def createTempFile(content: String): FileInputSplit = {
-    val tempFile = File.createTempFile("test_contents", "tmp")
-    tempFile.deleteOnExit()
-
-    val wrt = new OutputStreamWriter(new FileOutputStream(tempFile), Charsets.UTF_8)
-    wrt.write(content)
-    wrt.close()
-
-    new FileInputSplit(0, new Path(tempFile.toURI.toString), 0, tempFile.length,
-        Array("localhost"));
-    }
-
-    final class VertexDoubleIdAssigner extends MapFunction[Long, Double] {
-      @throws(classOf[Exception])
-      def map(id: Long): Double = {id.toDouble}
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
deleted file mode 100644
index 4b776e2..0000000
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.test.operations
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.scala.test.TestGraphUtils
-import org.apache.flink.graph.{Edge, Vertex}
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
-import _root_.scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class GraphMutationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
-MultipleProgramsTestBase(mode) {
-
-  private var expectedResult: String = null
-
-  @Test
-  @throws(classOf[Exception])
-  def testAddVertex {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-
-    val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L))
-    val res = newgraph.getVertices.collect().toList
-    expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testAddVertexExisting {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val newgraph = graph.addVertex(new Vertex[Long, Long](1L, 1L))
-    val res = newgraph.getVertices.collect().toList
-    expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testAddVertexNoEdges {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L))
-    val res = newgraph.getVertices.collect().toList
-    expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testAddVertices {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-
-    val newgraph = graph.addVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](6L, 6L),
-        new Vertex[Long, Long](7L, 7L)))
-    val res = newgraph.getVertices.collect().toList
-    expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n" + "7,7\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testAddVerticesExisting {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-
-    val newgraph = graph.addVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](5L, 5L),
-        new Vertex[Long, Long](6L, 6L)))
-    val res = newgraph.getVertices.collect().toList
-    expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testRemoveVertex {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val newgraph = graph.removeVertex(new Vertex[Long, Long](5L, 5L))
-    val res = newgraph.getEdges.collect().toList
-    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testRemoveInvalidVertex {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val newgraph = graph.removeVertex(new Vertex[Long, Long](6L, 6L))
-    val res = newgraph.getEdges.collect().toList
-    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
-      "45\n" + "5,1,51\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testRemoveVertices {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L),
-        new Vertex[Long, Long](2L, 2L)))
-    val res = newgraph.getEdges.collect().toList
-    expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testRemoveValidAndInvalidVertex {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L),
-        new Vertex[Long, Long](6L, 6L)))
-    val res = newgraph.getEdges.collect().toList
-    expectedResult = "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testAddEdge {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val newgraph = graph.addEdge(new Vertex[Long, Long](6L, 6L), new Vertex[Long, Long](1L,
-      1L), 61L)
-    val res = newgraph.getEdges.collect().toList
-    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
-      "45\n" + "5,1,51\n" + "6,1,61\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testAddEdges {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val newgraph = graph.addEdges(List[Edge[Long, Long]](new Edge(2L, 4L, 24L),
-       new Edge(4L, 1L, 41L), new Edge(4L, 3L, 43L)))
-    val res = newgraph.getEdges.collect().toList
-    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "2,4,24\n" + "3,4,34\n" + "3,5," +
-    "35\n" + "4,1,41\n" + "4,3,43\n" + "4,5,45\n" + "5,1,51\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testAddEdgesInvalidVertices {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val newgraph = graph.addEdges(List[Edge[Long, Long]](new Edge(6L, 1L, 61L),
-       new Edge(7L, 8L, 78L)))
-    val res = newgraph.getEdges.collect().toList
-    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," +
-    "35\n" + "4,5,45\n" + "5,1,51\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testAddExistingEdge {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val newgraph = graph.addEdge(new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](2L,
-      2L), 12L)
-    val res = newgraph.getEdges.collect().toList
-    expectedResult = "1,2,12\n" + "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," +
-      "35\n" + "4,5,45\n" + "5,1,51\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testRemoveEdge {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val newgraph = graph.removeEdge(new Edge[Long, Long](5L, 1L, 51L))
-    val res = newgraph.getEdges.collect().toList
-    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testRemoveInvalidEdge {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val newgraph = graph.removeEdge(new Edge[Long, Long](6L, 1L, 61L))
-    val res = newgraph.getEdges.collect().toList
-    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
-      "45\n" + "5,1,51\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testRemoveEdges {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val newgraph = graph.removeEdges(List[Edge[Long, Long]](new Edge(1L, 2L, 12L),
-      new Edge(4L, 5L, 45L)))
-    val res = newgraph.getEdges.collect().toList
-    expectedResult = "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "5,1,51\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testRemoveSameEdgeTwiceEdges {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val newgraph = graph.removeEdges(List[Edge[Long, Long]](new Edge(1L, 2L, 12L),
-       new Edge(1L, 2L, 12L)))
-    val res = newgraph.getEdges.collect().toList
-    expectedResult = "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
deleted file mode 100644
index 7f7ebc0..0000000
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.graph.scala.test.operations
-
-import org.apache.flink.api.common.functions.FilterFunction
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.scala.test.TestGraphUtils
-import org.apache.flink.graph.{Edge, Vertex}
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
-import _root_.scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class GraphOperationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
-MultipleProgramsTestBase(mode) {
-
-  private var expectedResult: String = null
-    
-  @Test
-  @throws(classOf[Exception])
-  def testUndirected {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.getUndirected.getEdges.collect().toList;
-
-    expectedResult = "1,2,12\n" + "2,1,12\n" + "1,3,13\n" + "3,1,13\n" + "2,3,23\n" + "3,2," +
-      "23\n" + "3,4,34\n" + "4,3,34\n" + "3,5,35\n" + "5,3,35\n" + "4,5,45\n" + "5,4,45\n" +
-      "5,1,51\n" + "1,5,51\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testReverse {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.reverse().getEdges.collect().toList;
-
-    expectedResult = "2,1,12\n" + "3,1,13\n" + "3,2,23\n" + "4,3,34\n" + "5,3,35\n" + "5,4," +
-      "45\n" + "1,5,51\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testSubGraph {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.subgraph(new FilterFunction[Vertex[Long, Long]] {
-      @throws(classOf[Exception])
-      def filter(vertex: Vertex[Long, Long]): Boolean = {
-        return (vertex.getValue > 2)
-      }
-    }, new FilterFunction[Edge[Long, Long]] {
-
-      @throws(classOf[Exception])
-      override def filter(edge: Edge[Long, Long]): Boolean = {
-        return (edge.getValue > 34)
-      }
-    }).getEdges.collect().toList;
-
-    expectedResult = "3,5,35\n" + "4,5,45\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testSubGraphSugar {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.subgraph(
-      vertex => vertex.getValue > 2,
-      edge => edge.getValue > 34
-    ).getEdges.collect().toList;
-
-    expectedResult = "3,5,35\n" + "4,5,45\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testFilterOnVertices {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.filterOnVertices(new FilterFunction[Vertex[Long, Long]] {
-      @throws(classOf[Exception])
-      def filter(vertex: Vertex[Long, Long]): Boolean = {
-        vertex.getValue > 2
-      }
-    }).getEdges.collect().toList;
-
-    expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testFilterOnVerticesSugar {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.filterOnVertices(
-      vertex => vertex.getValue > 2
-    ).getEdges.collect().toList;
-
-    expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testFilterOnEdges {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.filterOnEdges(new FilterFunction[Edge[Long, Long]] {
-      @throws(classOf[Exception])
-      def filter(edge: Edge[Long, Long]): Boolean = {
-        edge.getValue > 34
-      }
-    }).getEdges.collect().toList;
-
-    expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testFilterOnEdgesSugar {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.filterOnEdges(
-      edge => edge.getValue > 34
-    ).getEdges.collect().toList;
-
-    expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testNumberOfVertices {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = env.fromElements(graph.numberOfVertices).collect().toList
-    expectedResult = "5"
-    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testNumberOfEdges {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = env.fromElements(graph.numberOfEdges).collect().toList
-    expectedResult = "7"
-    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testVertexIds {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.getVertexIds.collect().toList
-    expectedResult = "1\n2\n3\n4\n5\n"
-    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testEdgesIds {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.getEdgeIds.collect().toList
-    expectedResult = "(1,2)\n" + "(1,3)\n" + "(2,3)\n" + "(3,4)\n" + "(3,5)\n" + "(4,5)\n" +
-      "(5,1)\n"
-    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testUnion {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]](
-      new Vertex[Long, Long](6L, 6L)
-    )
-    val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]](
-      new Edge[Long, Long](6L, 1L, 61L)
-    )
-
-    val newgraph = graph.union(Graph.fromCollection(vertices, edges, env))
-    val res = newgraph.getEdges.collect().toList
-    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
-      "45\n" + "5,1,51\n" + "6,1,61\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testDifference {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]](
-      new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](3L, 3L),
-      new Vertex[Long, Long](6L, 6L) 
-    )
-    val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]](
-      new Edge[Long, Long](1L, 3L, 13L), new Edge[Long, Long](1L, 6L, 16L),
-      new Edge[Long, Long](6L, 3L, 63L)
-    )
-
-    val newgraph = graph.difference(Graph.fromCollection(vertices, edges, env))
-    val res = newgraph.getEdges.collect().toList
-    expectedResult = "4,5,45\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testDifferenceNoCommonVertices {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]](
-      new Vertex[Long, Long](6L, 6L) 
-    )
-    val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]](
-      new Edge[Long, Long](6L, 6L, 66L)
-    )
-
-    val newgraph = graph.difference(Graph.fromCollection(vertices, edges, env))
-    val res = newgraph.getEdges.collect().toList
-    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
-      "45\n" + "5,1,51\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testTriplets {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.getTriplets.collect().toList
-    expectedResult = "1,2,1,2,12\n" + "1,3,1,3,13\n" + "2,3,2,3,23\n" + "3,4,3,4,34\n" +
-      "3,5,3,5,35\n" + "4,5,4,5,45\n" + "5,1,5,1,51\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-}