You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/11/19 11:10:26 UTC

[4/5] flink git commit: [FLINK-3013] [gelly] Incorrect package declaration in GellyScalaAPICompletenessTest.scala

[FLINK-3013] [gelly] Incorrect package declaration in GellyScalaAPICompletenessTest.scala

This closes #1356.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3a300e63
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3a300e63
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3a300e63

Branch: refs/heads/release-0.10
Commit: 3a300e639878bf602ffbee1218d7dfd5fe9a99b9
Parents: db456a7
Author: smarthi <sm...@apache.org>
Authored: Sun Nov 15 14:44:14 2015 -0500
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 19 11:03:14 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/graph/scala/Graph.scala    |  12 +-
 .../scala/example/ConnectedComponents.scala     |  11 +-
 .../example/GSASingleSourceShortestPaths.scala  |  20 +-
 .../graph/scala/example/GraphMetrics.scala      |   8 +-
 .../example/SingleSourceShortestPaths.scala     |  12 +-
 .../test/GellyScalaAPICompletenessTest.scala    |   2 +-
 .../flink/graph/scala/test/TestGraphUtils.scala |   4 +-
 .../scala/test/operations/DegreesITCase.scala   |  16 +-
 .../operations/GraphCreationWithCsvITCase.scala |  65 ++---
 .../test/operations/GraphMutationsITCase.scala  |  38 +--
 .../test/operations/GraphOperationsITCase.scala |  65 ++---
 .../test/operations/JoinWithEdgesITCase.scala   |   7 +-
 .../operations/JoinWithVerticesITCase.scala     |   7 +-
 .../scala/test/operations/MapEdgesITCase.scala  |   4 +-
 .../test/operations/MapVerticesITCase.scala     |   8 +-
 .../operations/ReduceOnEdgesMethodsITCase.scala |   4 +-
 .../ReduceOnNeighborMethodsITCase.scala         |   4 +-
 .../flink/graph/example/IncrementalSSSP.java    |   6 +-
 .../apache/flink/graph/gsa/GSACompilerTest.java |   6 +-
 .../flink/graph/gsa/GSATranslationTest.java     |   8 +-
 .../graph/spargel/SpargelCompilerTest.java      |  12 +-
 .../graph/spargel/SpargelTranslationTest.java   |  20 +-
 .../test/CollectionModeSuperstepITCase.java     |   2 +-
 .../test/GatherSumApplyConfigurationITCase.java |   5 +-
 .../flink/graph/test/GatherSumApplyITCase.java  |   2 +-
 .../apache/flink/graph/test/TestGraphUtils.java | 291 +++++++++----------
 .../test/VertexCentricConfigurationITCase.java  |  38 ++-
 .../test/example/IncrementalSSSPITCase.java     |   2 +-
 .../graph/test/example/MusicProfilesITCase.java |   2 +-
 ...ctedComponentsWithRandomisedEdgesITCase.java |   4 +-
 .../graph/test/library/PageRankITCase.java      |  16 +-
 .../graph/test/library/TriangleCountITCase.java |   4 +-
 .../test/operations/GraphCreationITCase.java    |   7 +-
 .../GraphCreationWithMapperITCase.java          |   2 +-
 .../test/operations/GraphMutationsITCase.java   | 222 +++++++-------
 .../test/operations/GraphOperationsITCase.java  |  10 +-
 .../test/operations/JoinWithEdgesITCase.java    |  11 +-
 .../test/operations/JoinWithVerticesITCase.java |   2 +-
 .../graph/test/operations/MapEdgesITCase.java   |   4 +-
 .../test/operations/MapVerticesITCase.java      |   4 +-
 .../operations/ReduceOnEdgesMethodsITCase.java  |  35 +--
 .../ReduceOnEdgesWithExceptionITCase.java       |   4 +-
 .../ReduceOnNeighborMethodsITCase.java          |  62 ++--
 .../ReduceOnNeighborsWithExceptionITCase.java   |   2 +-
 .../apache/flink/test/util/TestBaseUtils.java   |  30 +-
 45 files changed, 518 insertions(+), 582 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index e51453e..11ee7cd 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -170,7 +170,7 @@ object Graph {
   /**
   * Creates a Graph with from a CSV file of vertices and a CSV file of edges
   * 
-  * @param The Execution Environment.
+  * @param env Execution Environment.
   * @param pathEdges The file path containing the edges.
   * @param readVertices Defines whether the vertices have associated values.
   * If set to false, the vertex input is ignored and vertices are created from the edges file.
@@ -868,7 +868,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   * Adds the list of vertices, passed as input, to the graph.
   * If the vertices already exist in the graph, they will not be added once more.
   *
-  * @param verticesToAdd the list of vertices to add
+  * @param vertices the list of vertices to add
   * @return the new graph containing the existing and newly added vertices
   */
   def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = {
@@ -881,7 +881,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   * When adding an edge for a non-existing set of vertices,
   * the edge is considered invalid and ignored.
   *
-  * @param newEdges the data set of edges to be added
+  * @param edges the data set of edges to be added
   * @return a new graph containing the existing edges plus the newly added edges.
   */
   def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = {
@@ -916,7 +916,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
     /**
    * Removes the given vertex and its edges from the graph.
    *
-   * @param vertex the vertex to remove
+   * @param vertices list of vertices to remove
    * @return the new graph containing the existing vertices and edges without
    *         the removed vertex and its edges
    */
@@ -938,7 +938,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   /**
    * Removes all the edges that match the edges in the given data set from the graph.
    *
-   * @param edgesToBeRemoved the list of edges to be removed
+   * @param edges the list of edges to be removed
    * @return a new graph where the edges have been removed and in which the vertices remained intact
    */
   def removeEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = {
@@ -993,7 +993,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * The {@link ReduceNeighborsFunction} combines a pair of neighbor vertex values
    * into one new value of the same type.
    * 
-   * @param reduceNeighborsFunction the reduce function to apply to the neighbors of each vertex.
+   * @param reduceEdgesFunction the reduce function to apply to the edges of each vertex.
    * @param direction the edge direction (in-, out-, all-)
    * @return a Dataset of Tuple2, with one tuple per vertex.
    * The first field of the Tuple2 is the vertex ID and the second field

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
index b3da520..75b793e 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.graph.scala.example;
+package org.apache.flink.graph.scala.example
 
 import org.apache.flink.api.scala._
 import org.apache.flink.graph.scala._
@@ -32,7 +32,7 @@ import java.lang.Long
  * You can find all available library methods in [[org.apache.flink.graph.library]]. 
  * 
  * In particular, this example uses the
- * [[org.apache.flink.graph.library.ConnectedComponentsAlgorithm.GSAConnectedComponents]]
+ * [[org.apache.flink.graph.library.GSAConnectedComponents]]
  * library method to compute the connected components of the input graph.
  *
  * The input file is a plain text file and must be formatted as follows:
@@ -70,7 +70,7 @@ object ConnectedComponents {
   }
 
   private final class InitVertices extends MapFunction[Long, Long] {
-    override def map(id: Long) = {id}
+    override def map(id: Long) = id
   }
 
   // ***********************************************************************
@@ -87,19 +87,18 @@ object ConnectedComponents {
         if(args.length != 3) {
           System.err.println("Usage ConnectedComponents <edge path> <output path> " +
             "<num iterations>")
-          false
         }
         fileOutput = true
         edgesInputPath = args(0)
         outputPath = args(1)
-        maxIterations = (2).toInt
+        maxIterations = 2
       } else {
         System.out.println("Executing ConnectedComponents example with default parameters" +
           " and built-in default data.")
         System.out.println("  Provide parameters to read input data from files.")
         System.out.println("  See the documentation for the correct format of input files.")
         System.out.println("Usage ConnectedComponents <edge path> <output path> " +
-          "<num iterations>");
+          "<num iterations>")
       }
       true
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
index 2dc272c..68435ba 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
@@ -16,20 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.flink.graph.scala.example;
+package org.apache.flink.graph.scala.example
 
+import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.types.NullValue
 import org.apache.flink.graph.Edge
-import org.apache.flink.api.common.functions.MapFunction
-import scala.collection.JavaConversions._
-import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
 import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
-import org.apache.flink.graph.gsa.GatherFunction
-import org.apache.flink.graph.gsa.Neighbor
-import org.apache.flink.graph.gsa.SumFunction
-import org.apache.flink.graph.gsa.ApplyFunction
+import org.apache.flink.graph.gsa.{ApplyFunction, GatherFunction, Neighbor, SumFunction}
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
 
 /**
  * This example shows how to use Gelly's gather-sum-apply iterations.
@@ -121,20 +116,19 @@ object GSASingleSourceShortestPaths {
       if(args.length != 4) {
         System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
           " <input edges path> <output path> <num iterations>")
-        false
       }
       fileOutput = true
       srcVertexId = args(0).toLong
       edgesInputPath = args(1)
       outputPath = args(2)
-      maxIterations = (3).toInt
+      maxIterations = 3
     } else {
       System.out.println("Executing Single Source Shortest Paths example "
         + "with default parameters and built-in default data.")
       System.out.println("  Provide parameters to read input data from files.")
       System.out.println("  See the documentation for the correct format of input files.")
       System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
-        " <input edges path> <output path> <num iterations>");
+        " <input edges path> <output path> <num iterations>")
     }
     true
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
index 4eed824..1c3fcdd 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
@@ -53,13 +53,13 @@ object GraphMetrics {
     val graph: Graph[Long, NullValue, NullValue] = Graph.fromDataSet(getEdgeDataSet(env), env)
 
     /** get the number of vertices **/
-    val numVertices = graph.numberOfVertices;
+    val numVertices = graph.numberOfVertices
 
     /** get the number of edges **/
-    val numEdges = graph.numberOfEdges;
+    val numEdges = graph.numberOfEdges
 
     /** compute the average node degree **/
-    val verticesWithDegrees = graph.getDegrees;
+    val verticesWithDegrees = graph.getDegrees
     val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble)
 
     /** find the vertex with the maximum in-degree **/
@@ -114,7 +114,7 @@ object GraphMetrics {
         (key: Long, out: Collector[Edge[Long, NullValue]]) => {
           val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
           for ( i <- 0 to numOutEdges ) {
-            var target: Long = ((Math.random() * numVertices) + 1).toLong
+            val target: Long = ((Math.random() * numVertices) + 1).toLong
             new Edge[Long, NullValue](key, target, NullValue.getInstance())
           }
       })

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
index 65a8e7f..7fc23c4 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
@@ -16,11 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.graph.scala.example;
+package org.apache.flink.graph.scala.example
 
 import org.apache.flink.api.scala._
 import org.apache.flink.graph.scala._
-import org.apache.flink.types.NullValue
 import org.apache.flink.graph.Edge
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.graph.spargel.VertexUpdateFunction
@@ -95,7 +94,7 @@ object SingleSourceShortestPaths {
     override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) {
       var minDistance = Double.MaxValue
       while (inMessages.hasNext) {
-        var msg = inMessages.next
+        val msg = inMessages.next
         if (msg < minDistance) {
           minDistance = msg
         }
@@ -115,7 +114,7 @@ object SingleSourceShortestPaths {
 
     override def sendMessages(vertex: Vertex[Long, Double]) {
       for (edge: Edge[Long, Double] <- getEdges) {
-        sendMessageTo(edge.getTarget(), vertex.getValue + edge.getValue)
+        sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
       }
     }
   }
@@ -135,20 +134,19 @@ object SingleSourceShortestPaths {
       if(args.length != 4) {
         System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
           " <input edges path> <output path> <num iterations>")
-        false
       }
       fileOutput = true
       srcVertexId = args(0).toLong
       edgesInputPath = args(1)
       outputPath = args(2)
-      maxIterations = (3).toInt
+      maxIterations = 3
     } else {
       System.out.println("Executing Single Source Shortest Paths example "
         + "with default parameters and built-in default data.")
       System.out.println("  Provide parameters to read input data from files.")
       System.out.println("  See the documentation for the correct format of input files.")
       System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
-        " <input edges path> <output path> <num iterations>");
+        " <input edges path> <output path> <num iterations>")
     }
     true
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
index 55faee3..d7ab1dd 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.api.scala
+package org.apache.flink.graph.scala.test
 
 import java.lang.reflect.Method
 import org.apache.flink.graph.scala._

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
index 1c2cf54..2fedfc7 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
@@ -24,11 +24,11 @@ import org.apache.flink.graph.{Edge, Vertex}
 object TestGraphUtils {
 
     def getLongLongVertexData(env: ExecutionEnvironment): DataSet[Vertex[Long, Long]] = {
-        return env.fromCollection(getLongLongVertices)
+        env.fromCollection(getLongLongVertices)
     }
 
     def getLongLongEdgeData(env: ExecutionEnvironment): DataSet[Edge[Long, Long]] = {
-        return env.fromCollection(getLongLongEdges)
+        env.fromCollection(getLongLongEdges)
     }
 
     def getLongLongVertices: List[Vertex[Long, Long]] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
index b347049..2a2b34e 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
@@ -22,10 +22,10 @@ import org.apache.flink.api.scala._
 import org.apache.flink.graph.scala._
 import org.apache.flink.graph.scala.test.TestGraphUtils
 import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
+import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
+
 import _root_.scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
@@ -36,33 +36,33 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testInDegrees {
+  def testInDegrees() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.inDegrees.collect().toList
+    val res = graph.inDegrees().collect().toList
     expectedResult = "(1,1)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,2)\n"
     TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   @Test
   @throws(classOf[Exception])
-  def testOutDegrees {
+  def testOutDegrees() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.outDegrees.collect().toList
+    val res = graph.outDegrees().collect().toList
     expectedResult = "(1,2)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,1)\n"
     TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   @Test
   @throws(classOf[Exception])
-  def testGetDegrees {
+  def testGetDegrees() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.getDegrees.collect().toList
+    val res = graph.getDegrees().collect().toList
     expectedResult = "(1,3)\n" + "(2,2)\n" + "(3,4)\n" + "(4,2)\n" + "(5,3)\n"
     TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
index a963845..253040b 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
@@ -18,25 +18,20 @@
 
 package org.apache.flink.graph.scala.test.operations
 
+import java.io.{File, FileOutputStream, IOException, OutputStreamWriter}
+
+import com.google.common.base.Charsets
+import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.scala._
+import org.apache.flink.core.fs.{FileInputSplit, Path}
 import org.apache.flink.graph.scala._
-import org.apache.flink.graph.scala.test.TestGraphUtils
 import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
+import org.apache.flink.types.NullValue
+import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
+
 import _root_.scala.collection.JavaConverters._
-import java.io.IOException
-import org.apache.flink.core.fs.FileInputSplit
-import java.io.File
-import java.io.OutputStreamWriter
-import java.io.FileOutputStream
-import java.io.FileOutputStream
-import com.google.common.base.Charsets
-import org.apache.flink.core.fs.Path
-import org.apache.flink.types.NullValue
-import org.apache.flink.api.common.functions.MapFunction
 
 @RunWith(classOf[Parameterized])
 class GraphCreationWithCsvITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
@@ -46,7 +41,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testCsvWithValues {
+  def testCsvWithValues() {
     /*
      * Test with two Csv files, both vertices and edges have values
      */
@@ -61,14 +56,14 @@ MultipleProgramsTestBase(mode) {
         pathEdges = edgesSplit.getPath.toString,
         env = env)
     
-    val result = graph.getTriplets.collect()
+    val result = graph.getTriplets().collect()
     expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
-    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult)
   }
 
   @Test
   @throws(classOf[Exception])
-  def testCsvNoEdgeValues {
+  def testCsvNoEdgeValues() {
     /*
      * Test with two Csv files; edges have no values
      */
@@ -84,14 +79,14 @@ MultipleProgramsTestBase(mode) {
         hasEdgeValues = false,
         env = env)
     
-    val result = graph.getTriplets.collect()
+    val result = graph.getTriplets().collect()
     expectedResult = "1,2,one,two,(null)\n3,2,three,two,(null)\n3,1,three,one,(null)\n"
-    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult)
   }
 
   @Test
   @throws(classOf[Exception])
-  def testCsvWithMapperValues {
+  def testCsvWithMapperValues() {
     /*
      * Test with edges Csv file and vertex mapper initializer
      */
@@ -104,14 +99,14 @@ MultipleProgramsTestBase(mode) {
         vertexValueInitializer = new VertexDoubleIdAssigner(),
         env = env)
     
-    val result = graph.getTriplets.collect()
+    val result = graph.getTriplets().collect()
     expectedResult = "1,2,1.0,2.0,12\n3,2,3.0,2.0,32\n3,1,3.0,1.0,31\n"
-    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult)
   }
 
   @Test
   @throws(classOf[Exception])
-  def testCsvNoVertexValues {
+  def testCsvNoVertexValues() {
     /*
      * Test with edges Csv file: no vertex values
      */
@@ -123,15 +118,15 @@ MultipleProgramsTestBase(mode) {
         pathEdges = edgesSplit.getPath.toString,
         env = env)
     
-    val result = graph.getTriplets.collect()
+    val result = graph.getTriplets().collect()
     expectedResult = "1,2,(null),(null),12\n3,2,(null),(null),32\n" +
       "3,1,(null),(null),31\n"
-    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult)
   }
 
   @Test
   @throws(classOf[Exception])
-  def testCsvNoValues {
+  def testCsvNoValues() {
     /*
      * Test with edges Csv file: neither vertex nor edge values
      */
@@ -144,15 +139,15 @@ MultipleProgramsTestBase(mode) {
         hasEdgeValues = false,
         env = env)
     
-    val result = graph.getTriplets.collect()
+    val result = graph.getTriplets().collect()
     expectedResult = "1,2,(null),(null),(null)\n" +
       "3,2,(null),(null),(null)\n3,1,(null),(null),(null)\n"
-    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult)
   }
 
   @Test
   @throws(classOf[Exception])
-  def testCsvOptionsVertices {
+  def testCsvOptionsVertices() {
     /*
      * Test the options for vertices: delimiters, comments, ignore first line.
      */
@@ -172,14 +167,14 @@ MultipleProgramsTestBase(mode) {
         pathEdges = edgesSplit.getPath.toString,
         env = env)
     
-    val result = graph.getTriplets.collect()
+    val result = graph.getTriplets().collect()
     expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
-    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult)
   }
 
   @Test
   @throws(classOf[Exception])
-  def testCsvOptionsEdges {
+  def testCsvOptionsEdges() {
     /*
      * Test the options for edges: delimiters, comments, ignore first line.
      */
@@ -199,9 +194,9 @@ MultipleProgramsTestBase(mode) {
         pathEdges = edgesSplit.getPath.toString,
         env = env)
     
-    val result = graph.getTriplets.collect()
+    val result = graph.getTriplets().collect()
     expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
-    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
+    TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult)
   }
 
   @throws(classOf[IOException])
@@ -214,7 +209,7 @@ MultipleProgramsTestBase(mode) {
     wrt.close()
 
     new FileInputSplit(0, new Path(tempFile.toURI.toString), 0, tempFile.length,
-        Array("localhost"));
+        Array("localhost"))
     }
 
     final class VertexDoubleIdAssigner extends MapFunction[Long, Double] {

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
index 4b776e2..f6acdc1 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
@@ -23,10 +23,10 @@ import org.apache.flink.graph.scala._
 import org.apache.flink.graph.scala.test.TestGraphUtils
 import org.apache.flink.graph.{Edge, Vertex}
 import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
+import org.junit.Test
+
 import _root_.scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
@@ -37,7 +37,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testAddVertex {
+  def testAddVertex() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -50,7 +50,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testAddVertexExisting {
+  def testAddVertexExisting() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -62,7 +62,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testAddVertexNoEdges {
+  def testAddVertexNoEdges() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -74,7 +74,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testAddVertices {
+  def testAddVertices() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -88,7 +88,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testAddVerticesExisting {
+  def testAddVerticesExisting() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -102,7 +102,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testRemoveVertex {
+  def testRemoveVertex() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -114,7 +114,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testRemoveInvalidVertex {
+  def testRemoveInvalidVertex() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -127,7 +127,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testRemoveVertices {
+  def testRemoveVertices() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -140,7 +140,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testRemoveValidAndInvalidVertex {
+  def testRemoveValidAndInvalidVertex() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -153,7 +153,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testAddEdge {
+  def testAddEdge() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -167,7 +167,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testAddEdges {
+  def testAddEdges() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -181,7 +181,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testAddEdgesInvalidVertices {
+  def testAddEdgesInvalidVertices() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -195,7 +195,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testAddExistingEdge {
+  def testAddExistingEdge() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -209,7 +209,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testRemoveEdge {
+  def testRemoveEdge() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -221,7 +221,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testRemoveInvalidEdge {
+  def testRemoveInvalidEdge() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -234,7 +234,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testRemoveEdges {
+  def testRemoveEdges() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -247,7 +247,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testRemoveSameEdgeTwiceEdges {
+  def testRemoveSameEdgeTwiceEdges() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
index 7f7ebc0..9d77e68 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
@@ -23,10 +23,9 @@ import org.apache.flink.graph.scala._
 import org.apache.flink.graph.scala.test.TestGraphUtils
 import org.apache.flink.graph.{Edge, Vertex}
 import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
+import org.junit.Test
 import _root_.scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
@@ -37,11 +36,11 @@ MultipleProgramsTestBase(mode) {
     
   @Test
   @throws(classOf[Exception])
-  def testUndirected {
+  def testUndirected() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.getUndirected.getEdges.collect().toList;
+    val res = graph.getUndirected().getEdges.collect().toList
 
     expectedResult = "1,2,12\n" + "2,1,12\n" + "1,3,13\n" + "3,1,13\n" + "2,3,23\n" + "3,2," +
       "23\n" + "3,4,34\n" + "4,3,34\n" + "3,5,35\n" + "5,3,35\n" + "4,5,45\n" + "5,4,45\n" +
@@ -51,11 +50,11 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testReverse {
+  def testReverse() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.reverse().getEdges.collect().toList;
+    val res = graph.reverse().getEdges.collect().toList
 
     expectedResult = "2,1,12\n" + "3,1,13\n" + "3,2,23\n" + "4,3,34\n" + "5,3,35\n" + "5,4," +
       "45\n" + "1,5,51\n"
@@ -64,22 +63,22 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testSubGraph {
+  def testSubGraph() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val res = graph.subgraph(new FilterFunction[Vertex[Long, Long]] {
       @throws(classOf[Exception])
       def filter(vertex: Vertex[Long, Long]): Boolean = {
-        return (vertex.getValue > 2)
+        vertex.getValue > 2
       }
     }, new FilterFunction[Edge[Long, Long]] {
 
       @throws(classOf[Exception])
       override def filter(edge: Edge[Long, Long]): Boolean = {
-        return (edge.getValue > 34)
+        edge.getValue > 34
       }
-    }).getEdges.collect().toList;
+    }).getEdges.collect().toList
 
     expectedResult = "3,5,35\n" + "4,5,45\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -87,14 +86,14 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testSubGraphSugar {
+  def testSubGraphSugar() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val res = graph.subgraph(
       vertex => vertex.getValue > 2,
       edge => edge.getValue > 34
-    ).getEdges.collect().toList;
+    ).getEdges.collect().toList
 
     expectedResult = "3,5,35\n" + "4,5,45\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -102,7 +101,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testFilterOnVertices {
+  def testFilterOnVertices() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -111,7 +110,7 @@ MultipleProgramsTestBase(mode) {
       def filter(vertex: Vertex[Long, Long]): Boolean = {
         vertex.getValue > 2
       }
-    }).getEdges.collect().toList;
+    }).getEdges.collect().toList
 
     expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -119,13 +118,13 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testFilterOnVerticesSugar {
+  def testFilterOnVerticesSugar() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val res = graph.filterOnVertices(
       vertex => vertex.getValue > 2
-    ).getEdges.collect().toList;
+    ).getEdges.collect().toList
 
     expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -133,7 +132,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testFilterOnEdges {
+  def testFilterOnEdges() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -142,7 +141,7 @@ MultipleProgramsTestBase(mode) {
       def filter(edge: Edge[Long, Long]): Boolean = {
         edge.getValue > 34
       }
-    }).getEdges.collect().toList;
+    }).getEdges.collect().toList
 
     expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -150,13 +149,13 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testFilterOnEdgesSugar {
+  def testFilterOnEdgesSugar() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val res = graph.filterOnEdges(
       edge => edge.getValue > 34
-    ).getEdges.collect().toList;
+    ).getEdges.collect().toList
 
     expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
@@ -164,44 +163,44 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testNumberOfVertices {
+  def testNumberOfVertices() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = env.fromElements(graph.numberOfVertices).collect().toList
+    val res = env.fromElements(graph.numberOfVertices()).collect().toList
     expectedResult = "5"
     TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   @Test
   @throws(classOf[Exception])
-  def testNumberOfEdges {
+  def testNumberOfEdges() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = env.fromElements(graph.numberOfEdges).collect().toList
+    val res = env.fromElements(graph.numberOfEdges()).collect().toList
     expectedResult = "7"
     TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   @Test
   @throws(classOf[Exception])
-  def testVertexIds {
+  def testVertexIds() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.getVertexIds.collect().toList
+    val res = graph.getVertexIds().collect().toList
     expectedResult = "1\n2\n3\n4\n5\n"
     TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   @Test
   @throws(classOf[Exception])
-  def testEdgesIds {
+  def testEdgesIds() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.getEdgeIds.collect().toList
+    val res = graph.getEdgeIds().collect().toList
     expectedResult = "(1,2)\n" + "(1,3)\n" + "(2,3)\n" + "(3,4)\n" + "(3,5)\n" + "(4,5)\n" +
       "(5,1)\n"
     TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
@@ -209,7 +208,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testUnion {
+  def testUnion() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -229,7 +228,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testDifference {
+  def testDifference() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -250,7 +249,7 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testDifferenceNoCommonVertices {
+  def testDifferenceNoCommonVertices() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
@@ -270,11 +269,11 @@ MultipleProgramsTestBase(mode) {
 
   @Test
   @throws(classOf[Exception])
-  def testTriplets {
+  def testTriplets() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.getTriplets.collect().toList
+    val res = graph.getTriplets().collect().toList
     expectedResult = "1,2,1,2,12\n" + "1,3,1,3,13\n" + "2,3,2,3,23\n" + "3,4,3,4,34\n" +
       "3,5,3,5,35\n" + "4,5,4,5,45\n" + "5,1,5,1,51\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
index 83fa61b..0a7f1b9 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
@@ -20,17 +20,16 @@ package org.apache.flink.graph.scala.test.operations
 
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.scala._
-import org.apache.flink.graph.Edge
+import org.apache.flink.graph.{Edge, EdgeJoinFunction}
 import org.apache.flink.graph.scala._
 import org.apache.flink.graph.scala.test.TestGraphUtils
 import org.apache.flink.graph.scala.utils.EdgeToTuple3Map
 import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
+import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
+
 import _root_.scala.collection.JavaConverters._
-import org.apache.flink.graph.EdgeJoinFunction
 
 @RunWith(classOf[Parameterized])
 class JoinWithEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
index f2beb7b..5998270 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
@@ -18,18 +18,17 @@
 
 package org.apache.flink.graph.scala.test.operations
 
-import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.scala._
+import org.apache.flink.graph.VertexJoinFunction
 import org.apache.flink.graph.scala._
 import org.apache.flink.graph.scala.test.TestGraphUtils
 import org.apache.flink.graph.scala.utils.VertexToTuple2Map
 import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
+import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
+
 import _root_.scala.collection.JavaConverters._
-import org.apache.flink.graph.VertexJoinFunction
 
 @RunWith(classOf[Parameterized])
 class JoinWithVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
index bdfd569..4c1d1f0 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
@@ -24,10 +24,10 @@ import org.apache.flink.graph.Edge
 import org.apache.flink.graph.scala._
 import org.apache.flink.graph.scala.test.TestGraphUtils
 import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
+import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
+
 import _root_.scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
index 2e51d90..a27b42c 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
@@ -24,10 +24,10 @@ import org.apache.flink.graph.Vertex
 import org.apache.flink.graph.scala._
 import org.apache.flink.graph.scala.test.TestGraphUtils
 import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
+import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
+
 import _root_.scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
@@ -47,7 +47,7 @@ MultipleProgramsTestBase(mode) {
       "2,3\n" +
       "3,4\n" +
       "4,5\n" +
-      "5,6\n";
+      "5,6\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
@@ -62,7 +62,7 @@ MultipleProgramsTestBase(mode) {
       "2,3\n" +
       "3,4\n" +
       "4,5\n" +
-      "5,6\n";
+      "5,6\n"
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
index dcd1deb..6dda547 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
@@ -24,10 +24,10 @@ import org.apache.flink.graph.scala.test.TestGraphUtils
 import org.apache.flink.graph.scala.{EdgesFunction, EdgesFunctionWithVertexValue, Graph}
 import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
 import org.apache.flink.util.Collector
-import org.junit.rules.TemporaryFolder
+import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
+
 import _root_.scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
index aef5493..67e9b9a 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
@@ -24,10 +24,10 @@ import org.apache.flink.graph.scala.{NeighborsFunctionWithVertexValue, _}
 import org.apache.flink.graph.{Edge, EdgeDirection, ReduceNeighborsFunction, Vertex}
 import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
 import org.apache.flink.util.Collector
-import org.junit.rules.TemporaryFolder
+import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
+
 import _root_.scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
index 947f343..f454376 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
@@ -135,7 +135,7 @@ public class IncrementalSSSP implements ProgramDescription {
 	 *
 	 * @param edgeToBeRemoved
 	 * @param edgesInSSSP
-	 * @return
+	 * @return true or false
 	 */
 	public static boolean isInSSSP(final Edge<Long, Double> edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception {
 
@@ -154,9 +154,7 @@ public class IncrementalSSSP implements ProgramDescription {
 			if (inMessages.hasNext()) {
 				Long outDegree = getOutDegree() - 1;
 				// check if the vertex has another SP-Edge
-				if (outDegree > 0) {
-					// there is another shortest path from the source to this vertex
-				} else {
+				if (outDegree <= 0) {
 					// set own value to infinity
 					setNewVertexValue(Double.MAX_VALUE);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
index 2ad203f..cd677b6 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
@@ -56,7 +56,7 @@ public class GSACompilerTest extends CompilerTestBase {
 			env.setParallelism(DEFAULT_PARALLELISM);
 			// compose test program
 			{
-				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<Long, Long, NullValue>(
+				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<>(
 						1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>());
 
 				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
@@ -124,7 +124,7 @@ public class GSACompilerTest extends CompilerTestBase {
 		public Long gather(Neighbor<Long, NullValue> neighbor) {
 			return neighbor.getNeighborValue();
 		}
-	};
+	}
 
 	@SuppressWarnings("serial")
 	private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
@@ -132,7 +132,7 @@ public class GSACompilerTest extends CompilerTestBase {
 		public Long sum(Long newValue, Long currentValue) {
 			return Math.min(newValue, currentValue);
 		}
-	};
+	}
 
 	@SuppressWarnings("serial")
 	private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
index ced7508..2deebcb 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
@@ -71,7 +71,7 @@ public class GSATranslationTest {
 			// ------------ construct the test program ------------------
 			{
 
-				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<Long, Long, NullValue>(
+				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<>(
 						1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>());
 
 				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
@@ -98,7 +98,7 @@ public class GSATranslationTest {
 			assertTrue(result instanceof DeltaIterationResultSet);
 			
 			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
-			DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
+			DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
 			
 			// check the basic iteration properties
 			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
@@ -142,7 +142,7 @@ public class GSATranslationTest {
 		public Long gather(Neighbor<Long, NullValue> neighbor) {
 			return neighbor.getNeighborValue();
 		}
-	};
+	}
 
 	@SuppressWarnings("serial")
 	private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
@@ -150,7 +150,7 @@ public class GSATranslationTest {
 		public Long sum(Long newValue, Long currentValue) {
 			return Math.min(newValue, currentValue);
 		}
-	};
+	}
 
 	@SuppressWarnings("serial")
 	private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
index 7a8143a..335481f 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
@@ -61,14 +61,14 @@ public class SpargelCompilerTest extends CompilerTestBase {
 			{
 
 				DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
-						new Tuple2<Long, Long>(1L, 1L), new Tuple2<Long, Long>(2L, 2L))
+						new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
 						.map(new Tuple2ToVertexMap<Long, Long>());
 
-				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L))
+				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
 					.map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
 
 						public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
-							return new Edge<Long, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+							return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
 						}
 				});
 
@@ -143,14 +143,14 @@ public class SpargelCompilerTest extends CompilerTestBase {
 				DataSet<Long> bcVar = env.fromElements(1L);
 
 				DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
-						new Tuple2<Long, Long>(1L, 1L), new Tuple2<Long, Long>(2L, 2L))
+						new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
 						.map(new Tuple2ToVertexMap<Long, Long>());
 
-				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L))
+				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
 						.map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
 
 							public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
-								return new Edge<Long, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+								return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
 							}
 					});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
index bb3a131..69aa99c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
@@ -52,7 +52,7 @@ public class SpargelTranslationTest {
 			final String BC_SET_MESSAGES_NAME = "borat messages";
 			
 			final String BC_SET_UPDATES_NAME = "borat updates";
-			;
+
 			final int NUM_ITERATIONS = 13;
 			
 			final int ITERATION_parallelism = 77;
@@ -68,16 +68,16 @@ public class SpargelTranslationTest {
 			// ------------ construct the test program ------------------
 			{
 				
-				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
+				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44));
 
-				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
+				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
 
 				Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
 						edges.map(new MapFunction<Tuple2<String,String>, Tuple3<String, String, NullValue>>() {
 
 							public Tuple3<String, String, NullValue> map(
 									Tuple2<String, String> edge) {
-								return new Tuple3<String, String, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+								return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
 							}
 						}), env);
 
@@ -101,7 +101,7 @@ public class SpargelTranslationTest {
 			assertTrue(result instanceof DeltaIterationResultSet);
 			
 			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
-			DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
+			DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
 			
 			// check the basic iteration properties
 			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
@@ -139,7 +139,7 @@ public class SpargelTranslationTest {
 			final String BC_SET_MESSAGES_NAME = "borat messages";
 			
 			final String BC_SET_UPDATES_NAME = "borat updates";
-			;
+
 			final int NUM_ITERATIONS = 13;
 			
 			final int ITERATION_parallelism = 77;
@@ -154,16 +154,16 @@ public class SpargelTranslationTest {
 			// ------------ construct the test program ------------------
 			{
 
-				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
+				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44));
 
-				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
+				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
 
 				Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
 						edges.map(new MapFunction<Tuple2<String,String>, Tuple3<String, String, NullValue>>() {
 
 							public Tuple3<String, String, NullValue> map(
 									Tuple2<String, String> edge) {
-								return new Tuple3<String, String, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+								return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
 							}
 						}), env);
 
@@ -187,7 +187,7 @@ public class SpargelTranslationTest {
 			assertTrue(result instanceof DeltaIterationResultSet);
 			
 			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
-			DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
+			DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
 			
 			// check the basic iteration properties
 			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
index 3fbd0bc..61fe0c2 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
@@ -78,7 +78,7 @@ public class CollectionModeSuperstepITCase {
 	public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
 
 		public Long map(Vertex<Long, Long> value) {
-			return 1l;
+			return 1L;
 		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
index 67d32a8..1e44d5b 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
@@ -382,11 +382,10 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 	private static final class FindAllReachableVertices extends SumFunction<HashSet<Long>, Long, HashSet<Long>> {
 		@Override
 		public HashSet<Long> sum(HashSet<Long> newSet, HashSet<Long> currentSet) {
-			HashSet<Long> set = currentSet;
 			for(Long l : newSet) {
-				set.add(l);
+				currentSet.add(l);
 			}
-			return set;
+			return currentSet;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index 0213f02..039a05c 100755
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -79,7 +79,7 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 				new InitMapperSSSP(), env);
 
         List<Vertex<Long, Double>> result = inputGraph.run(
-        		new GSASingleSourceShortestPaths<Long>(1l, 16)).collect();
+        		new GSASingleSourceShortestPaths<>(1L, 16)).collect();
 
 		expectedResult = "1,0.0\n" +
 				"2,12.0\n" +