You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/09/29 00:39:02 UTC

[2/3] flink git commit: [FLINK-2561] [gelly] convert existing tests to use collect instead of files; add tests for newly added operations.

[FLINK-2561] [gelly] convert existing tests to use collect instead of files;
add tests for newly added operations.

Add completeness test: fromCsvReader method is missing.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e0284ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e0284ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e0284ef

Branch: refs/heads/master
Commit: 9e0284efa90561c88b1bc829b800d89e84477caa
Parents: 0b4dc06
Author: vasia <va...@apache.org>
Authored: Thu Sep 24 23:31:38 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Tue Sep 29 00:38:20 2015 +0200

----------------------------------------------------------------------
 flink-staging/flink-gelly-scala/pom.xml         |   8 +-
 .../org/apache/flink/graph/scala/Graph.scala    |   9 +-
 .../test/GellyScalaAPICompletenessTest.scala    |  45 +++++
 .../scala/test/operations/DegreesITCase.scala   |  39 ++---
 .../test/operations/GraphMutationsITCase.scala  | 165 ++++++++++++++-----
 .../test/operations/GraphOperationsITCase.scala | 151 +++++++++++------
 .../test/operations/JoinWithEdgesITCase.scala   |  45 ++---
 .../operations/JoinWithVerticesITCase.scala     |  29 +---
 .../scala/test/operations/MapEdgesITCase.scala  |  33 +---
 .../test/operations/MapVerticesITCase.scala     |  33 +---
 .../operations/ReduceOnEdgesMethodsITCase.scala |  63 +++----
 .../ReduceOnNeighborMethodsITCase.scala         |  46 ++----
 12 files changed, 355 insertions(+), 311 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/pom.xml b/flink-staging/flink-gelly-scala/pom.xml
index a1f0da7..edcb865 100644
--- a/flink-staging/flink-gelly-scala/pom.xml
+++ b/flink-staging/flink-gelly-scala/pom.xml
@@ -48,7 +48,13 @@ under the License.
             <artifactId>flink-gelly</artifactId>
             <version>${project.version}</version>
         </dependency>
-
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-test-utils</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index ed58ffd..35af1ed 100644
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -100,7 +100,8 @@ object Graph {
                               env: ExecutionEnvironment): Graph[K, VV, EV] = {
     val javaTupleVertices = vertices.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
     val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
-    wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges, env.getJavaEnv))
+    wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges,
+        env.getJavaEnv))
   }
 
   /**
@@ -678,7 +679,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   /**
   * Adds the given list edges to the graph.
   *
-  * When adding an edge for a non-existing set of vertices, the edge is considered invalid and ignored.
+  * When adding an edge for a non-existing set of vertices,
+  * the edge is considered invalid and ignored.
   *
   * @param newEdges the data set of edges to be added
   * @return a new graph containing the existing edges plus the newly added edges.
@@ -757,7 +759,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
 
   /**
   * Performs Difference on the vertex and edge sets of the input graphs
-  * removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed
+  * removes common vertices and edges. If a source/target vertex is removed,
+  * its corresponding edge will also be removed
   * @param graph the graph to perform difference with
   * @return a new graph where the common vertices and edges have been removed
   */

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
new file mode 100644
index 0000000..c63c4f8
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala
+
+import java.lang.reflect.Method
+import org.apache.flink.graph.scala._
+import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
+import org.apache.flink.graph.{Graph => JavaGraph}
+import scala.language.existentials
+import org.junit.Test
+
+/**
+ * This checks whether the Gelly Scala API is up to feature parity with the Java API.
+ * Implements the {@link ScalaAPICompletenessTest} for Gelly.
+ */
+class GellyScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
+
+  override def isExcludedByName(method: Method): Boolean = {
+    val name = method.getDeclaringClass.getName + "." + method.getName
+    val excludedNames = Seq("org.apache.flink.graph.Graph.getContext",
+        // NOTE: until fromCsvReader() is added to to the Scala API Graph
+        "org.apache.flink.graph.Graph.fromCsvReader")
+    excludedNames.contains(name)
+  }
+
+  @Test
+  override def testCompleteness(): Unit = {
+    checkMethods("Graph", "Graph", classOf[JavaGraph[_, _, _]], classOf[Graph[_, _, _]])
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
index 98dbbe9..6196f99 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
@@ -26,42 +26,23 @@ import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class DegreesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
 MultipleProgramsTestBase(mode) {
 
-  private var resultPath: String = null
   private var expectedResult: String = null
 
-  var tempFolder: TemporaryFolder = new TemporaryFolder()
-
-  @Rule
-  def getFolder(): TemporaryFolder = {
-    tempFolder;
-  }
-
-  @Before
-  @throws(classOf[Exception])
-  def before {
-    resultPath = tempFolder.newFile.toURI.toString
-  }
-
-  @After
-  @throws(classOf[Exception])
-  def after {
-    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-
   @Test
   @throws(classOf[Exception])
   def testInDegrees {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.inDegrees().writeAsCsv(resultPath)
-    env.execute
-    expectedResult = "1,1\n" + "2,1\n" + "3,2\n" + "4,1\n" + "5,2\n"
+    val res = graph.inDegrees().collect.toList
+    expectedResult = "(1,1)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,2)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   @Test
@@ -70,9 +51,9 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.outDegrees().writeAsCsv(resultPath)
-    env.execute
-    expectedResult = "1,2\n" + "2,1\n" + "3,2\n" + "4,1\n" + "5,1\n"
+    val res = graph.outDegrees().collect.toList
+    expectedResult = "(1,2)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,1)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   @Test
@@ -81,8 +62,8 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.getDegrees().writeAsCsv(resultPath)
-    env.execute
-    expectedResult = "1,3\n" + "2,2\n" + "3,4\n" + "4,2\n" + "5,3\n"
+    val res = graph.getDegrees().collect.toList
+    expectedResult = "(1,3)\n" + "(2,2)\n" + "(3,4)\n" + "(4,2)\n" + "(5,3)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
index 687b0a7..3cb92c4 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
@@ -27,33 +27,14 @@ import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class GraphMutationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
 MultipleProgramsTestBase(mode) {
 
-  private var resultPath: String = null
   private var expectedResult: String = null
 
-  var tempFolder: TemporaryFolder = new TemporaryFolder()
-
-  @Rule
-  def getFolder(): TemporaryFolder = {
-    tempFolder;
-  }
-
-  @Before
-  @throws(classOf[Exception])
-  def before {
-    resultPath = tempFolder.newFile.toURI.toString
-  }
-
-  @After
-  @throws(classOf[Exception])
-  def after {
-    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-
   @Test
   @throws(classOf[Exception])
   def testAddVertex {
@@ -62,9 +43,9 @@ MultipleProgramsTestBase(mode) {
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
 
     val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L))
-    newgraph.getVerticesAsTuple2.writeAsCsv(resultPath)
-    env.execute
+    val res = newgraph.getVertices.collect().toList
     expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -74,9 +55,9 @@ MultipleProgramsTestBase(mode) {
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val newgraph = graph.addVertex(new Vertex[Long, Long](1L, 1L))
-    newgraph.getVerticesAsTuple2.writeAsCsv(resultPath)
-    env.execute
+    val res = newgraph.getVertices.collect().toList
     expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -86,9 +67,37 @@ MultipleProgramsTestBase(mode) {
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L))
-    newgraph.getVerticesAsTuple2.writeAsCsv(resultPath)
-    env.execute
+    val res = newgraph.getVertices.collect().toList
+    expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAddVertices {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+
+    val newgraph = graph.addVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](6L, 6L),
+        new Vertex[Long, Long](7L, 7L)))
+    val res = newgraph.getVertices.collect().toList
+    expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n" + "7,7\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAddVerticesExisting {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+
+    val newgraph = graph.addVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](5L, 5L),
+        new Vertex[Long, Long](6L, 6L)))
+    val res = newgraph.getVertices.collect().toList
     expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -98,9 +107,9 @@ MultipleProgramsTestBase(mode) {
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val newgraph = graph.removeVertex(new Vertex[Long, Long](5L, 5L))
-    newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
-    env.execute
+    val res = newgraph.getEdges.collect().toList
     expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -110,10 +119,36 @@ MultipleProgramsTestBase(mode) {
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val newgraph = graph.removeVertex(new Vertex[Long, Long](6L, 6L))
-    newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
-    env.execute
+    val res = newgraph.getEdges.collect.toList
     expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
       "45\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testRemoveVertices {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L),
+        new Vertex[Long, Long](2L, 2L)))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testRemoveValidAndInvalidVertex {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L),
+        new Vertex[Long, Long](6L, 6L)))
+    val res = newgraph.getEdges.collect.toList
+    expectedResult = "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -124,10 +159,38 @@ MultipleProgramsTestBase(mode) {
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val newgraph = graph.addEdge(new Vertex[Long, Long](6L, 6L), new Vertex[Long, Long](1L,
       1L), 61L)
-    newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
-    env.execute
+    val res = newgraph.getEdges.collect.toList
     expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
       "45\n" + "5,1,51\n" + "6,1,61\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAddEdges {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.addEdges(List[Edge[Long, Long]](new Edge(2L, 4L, 24L),
+       new Edge(4L, 1L, 41L), new Edge(4L, 3L, 43L)))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "2,4,24\n" + "3,4,34\n" + "3,5," +
+    "35\n" + "4,1,41\n" + "4,3,43\n" + "4,5,45\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAddEdgesInvalidVertices {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.addEdges(List[Edge[Long, Long]](new Edge(6L, 1L, 61L),
+       new Edge(7L, 8L, 78L)))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," +
+    "35\n" + "4,5,45\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -138,10 +201,10 @@ MultipleProgramsTestBase(mode) {
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val newgraph = graph.addEdge(new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](2L,
       2L), 12L)
-    newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
-    env.execute
+    val res = newgraph.getEdges.collect.toList
     expectedResult = "1,2,12\n" + "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," +
       "35\n" + "4,5,45\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -151,9 +214,9 @@ MultipleProgramsTestBase(mode) {
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val newgraph = graph.removeEdge(new Edge[Long, Long](5L, 1L, 51L))
-    newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
-    env.execute
+    val res = newgraph.getEdges.collect.toList
     expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -163,9 +226,35 @@ MultipleProgramsTestBase(mode) {
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val newgraph = graph.removeEdge(new Edge[Long, Long](6L, 1L, 61L))
-    newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
-    env.execute
+    val res = newgraph.getEdges.collect.toList
     expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
       "45\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testRemoveEdges {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.removeEdges(List[Edge[Long, Long]](new Edge(1L, 2L, 12L),
+      new Edge(4L, 5L, 45L)))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testRemoveSameEdgeTwiceEdges {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.removeEdges(List[Edge[Long, Long]](new Edge(1L, 2L, 12L),
+       new Edge(1L, 2L, 12L)))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
index 713eb8d..7f7ebc0 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
@@ -27,44 +27,26 @@ import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class GraphOperationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
 MultipleProgramsTestBase(mode) {
 
-  private var resultPath: String = null
   private var expectedResult: String = null
-
-  var tempFolder: TemporaryFolder = new TemporaryFolder()
-
-  @Rule
-  def getFolder(): TemporaryFolder = {
-    tempFolder;
-  }
-
-  @Before
-  @throws(classOf[Exception])
-  def before {
-    resultPath = tempFolder.newFile.toURI.toString
-  }
-
-  @After
-  @throws(classOf[Exception])
-  def after {
-    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-
+    
   @Test
   @throws(classOf[Exception])
   def testUndirected {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.getUndirected().getEdgesAsTuple3().writeAsCsv(resultPath)
-    env.execute
+    val res = graph.getUndirected.getEdges.collect().toList;
+
     expectedResult = "1,2,12\n" + "2,1,12\n" + "1,3,13\n" + "3,1,13\n" + "2,3,23\n" + "3,2," +
       "23\n" + "3,4,34\n" + "4,3,34\n" + "3,5,35\n" + "5,3,35\n" + "4,5,45\n" + "5,4,45\n" +
       "5,1,51\n" + "1,5,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -73,10 +55,11 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.reverse().getEdgesAsTuple3().writeAsCsv(resultPath)
-    env.execute
+    val res = graph.reverse().getEdges.collect().toList;
+
     expectedResult = "2,1,12\n" + "3,1,13\n" + "3,2,23\n" + "4,3,34\n" + "5,3,35\n" + "5,4," +
       "45\n" + "1,5,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -85,7 +68,7 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.subgraph(new FilterFunction[Vertex[Long, Long]] {
+    val res = graph.subgraph(new FilterFunction[Vertex[Long, Long]] {
       @throws(classOf[Exception])
       def filter(vertex: Vertex[Long, Long]): Boolean = {
         return (vertex.getValue > 2)
@@ -96,9 +79,10 @@ MultipleProgramsTestBase(mode) {
       override def filter(edge: Edge[Long, Long]): Boolean = {
         return (edge.getValue > 34)
       }
-    }).getEdgesAsTuple3().writeAsCsv(resultPath)
-    env.execute
+    }).getEdges.collect().toList;
+
     expectedResult = "3,5,35\n" + "4,5,45\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -107,12 +91,13 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.subgraph(
+    val res = graph.subgraph(
       vertex => vertex.getValue > 2,
       edge => edge.getValue > 34
-    ).getEdgesAsTuple3().writeAsCsv(resultPath)
-    env.execute
+    ).getEdges.collect().toList;
+
     expectedResult = "3,5,35\n" + "4,5,45\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -121,14 +106,15 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.filterOnVertices(new FilterFunction[Vertex[Long, Long]] {
+    val res = graph.filterOnVertices(new FilterFunction[Vertex[Long, Long]] {
       @throws(classOf[Exception])
       def filter(vertex: Vertex[Long, Long]): Boolean = {
         vertex.getValue > 2
       }
-    }).getEdgesAsTuple3().writeAsCsv(resultPath)
-    env.execute
+    }).getEdges.collect().toList;
+
     expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -137,11 +123,12 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.filterOnVertices(
+    val res = graph.filterOnVertices(
       vertex => vertex.getValue > 2
-    ).getEdgesAsTuple3().writeAsCsv(resultPath)
-    env.execute
+    ).getEdges.collect().toList;
+
     expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -150,14 +137,15 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.filterOnEdges(new FilterFunction[Edge[Long, Long]] {
+    val res = graph.filterOnEdges(new FilterFunction[Edge[Long, Long]] {
       @throws(classOf[Exception])
       def filter(edge: Edge[Long, Long]): Boolean = {
         edge.getValue > 34
       }
-    }).getEdgesAsTuple3().writeAsCsv(resultPath)
-    env.execute
+    }).getEdges.collect().toList;
+
     expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -166,11 +154,12 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.filterOnEdges(
+    val res = graph.filterOnEdges(
       edge => edge.getValue > 34
-    ).getEdgesAsTuple3().writeAsCsv(resultPath)
-    env.execute
+    ).getEdges.collect().toList;
+
     expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -179,9 +168,9 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    env.fromElements(graph.numberOfVertices).writeAsText(resultPath)
-    env.execute
+    val res = env.fromElements(graph.numberOfVertices).collect().toList
     expectedResult = "5"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   @Test
@@ -190,9 +179,9 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    env.fromElements(graph.numberOfEdges).writeAsText(resultPath)
-    env.execute
+    val res = env.fromElements(graph.numberOfEdges).collect().toList
     expectedResult = "7"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   @Test
@@ -201,9 +190,9 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.getVertexIds.writeAsText(resultPath)
-    env.execute
+    val res = graph.getVertexIds.collect().toList
     expectedResult = "1\n2\n3\n4\n5\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   @Test
@@ -212,9 +201,10 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.getEdgeIds.writeAsCsv(resultPath)
-    env.execute
-    expectedResult = "1,2\n" + "1,3\n" + "2,3\n" + "3,4\n" + "3,5\n" + "4,5\n" + "5,1\n"
+    val res = graph.getEdgeIds.collect().toList
+    expectedResult = "(1,2)\n" + "(1,3)\n" + "(2,3)\n" + "(3,4)\n" + "(3,5)\n" + "(4,5)\n" +
+      "(5,1)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   @Test
@@ -231,9 +221,62 @@ MultipleProgramsTestBase(mode) {
     )
 
     val newgraph = graph.union(Graph.fromCollection(vertices, edges, env))
-    newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
-    env.execute
+    val res = newgraph.getEdges.collect().toList
     expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
       "45\n" + "5,1,51\n" + "6,1,61\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testDifference {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]](
+      new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](3L, 3L),
+      new Vertex[Long, Long](6L, 6L) 
+    )
+    val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]](
+      new Edge[Long, Long](1L, 3L, 13L), new Edge[Long, Long](1L, 6L, 16L),
+      new Edge[Long, Long](6L, 3L, 63L)
+    )
+
+    val newgraph = graph.difference(Graph.fromCollection(vertices, edges, env))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "4,5,45\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testDifferenceNoCommonVertices {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]](
+      new Vertex[Long, Long](6L, 6L) 
+    )
+    val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]](
+      new Edge[Long, Long](6L, 6L, 66L)
+    )
+
+    val newgraph = graph.difference(Graph.fromCollection(vertices, edges, env))
+    val res = newgraph.getEdges.collect().toList
+    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
+      "45\n" + "5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testTriplets {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val res = graph.getTriplets.collect().toList
+    expectedResult = "1,2,1,2,12\n" + "1,3,1,3,13\n" + "2,3,2,3,23\n" + "3,4,3,4,34\n" +
+      "3,5,3,5,35\n" + "4,5,4,5,45\n" + "5,1,5,1,51\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
index e19463e..eae8bd5 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
@@ -29,33 +29,14 @@ import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class JoinWithEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
 MultipleProgramsTestBase(mode) {
 
-  private var resultPath: String = null
   private var expectedResult: String = null
 
-  var tempFolder: TemporaryFolder = new TemporaryFolder()
-
-  @Rule
-  def getFolder(): TemporaryFolder = {
-    tempFolder;
-  }
-
-  @Before
-  @throws(classOf[Exception])
-  def before {
-    resultPath = tempFolder.newFile.toURI.toString
-  }
-
-  @After
-  @throws(classOf[Exception])
-  def after {
-    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-
   @Test
   @throws(classOf[Exception])
   def testWithEdgesInputDataset {
@@ -64,10 +45,10 @@ MultipleProgramsTestBase(mode) {
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new
         EdgeToTuple3Map[Long, Long]), new AddValuesMapper)
-    result.getEdgesAsTuple3().writeAsCsv(resultPath)
-    env.execute
+    val res = result.getEdges.collect.toList
     expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
       "90\n" + "5,1,102\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -79,10 +60,10 @@ MultipleProgramsTestBase(mode) {
     val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new
         EdgeToTuple3Map[Long, Long]), (originalValue: Long, tupleValue: Long) =>
       originalValue + tupleValue)
-    result.getEdgesAsTuple3().writeAsCsv(resultPath)
-    env.execute
+    val res = result.getEdges.collect.toList
     expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
       "90\n" + "5,1,102\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -94,10 +75,10 @@ MultipleProgramsTestBase(mode) {
     val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges
       .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) =>
       originalValue + tupleValue)
-    result.getEdgesAsTuple3().writeAsCsv(resultPath)
-    env.execute
+    val res = result.getEdges.collect.toList
     expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," +
       "90\n" + "5,1,102\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -109,10 +90,10 @@ MultipleProgramsTestBase(mode) {
     val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges
       .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) =>
       originalValue + tupleValue)
-    result.getEdgesAsTuple3().writeAsCsv(resultPath)
-    env.execute
+    val res = result.getEdges.collect.toList
     expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," +
       "90\n" + "5,1,102\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -124,10 +105,10 @@ MultipleProgramsTestBase(mode) {
     val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges
       .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) =>
       originalValue + tupleValue)
-    result.getEdgesAsTuple3().writeAsCsv(resultPath)
-    env.execute
+    val res = result.getEdges.collect.toList
     expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
       "80\n" + "5,1,102\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -139,10 +120,10 @@ MultipleProgramsTestBase(mode) {
     val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges
       .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) =>
       originalValue + tupleValue)
-    result.getEdgesAsTuple3().writeAsCsv(resultPath)
-    env.execute
+    val res = result.getEdges.collect.toList
     expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
       "80\n" + "5,1,102\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
index 4b8f354..8d18d58 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
@@ -28,33 +28,14 @@ import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class JoinWithVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
 MultipleProgramsTestBase(mode) {
 
-  private var resultPath: String = null
   private var expectedResult: String = null
 
-  var tempFolder: TemporaryFolder = new TemporaryFolder()
-
-  @Rule
-  def getFolder(): TemporaryFolder = {
-    tempFolder;
-  }
-
-  @Before
-  @throws(classOf[Exception])
-  def before {
-    resultPath = tempFolder.newFile.toURI.toString
-  }
-
-  @After
-  @throws(classOf[Exception])
-  def after {
-    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-
   @Test
   @throws(classOf[Exception])
   def testJoinWithVertexSet {
@@ -63,9 +44,9 @@ MultipleProgramsTestBase(mode) {
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val result: Graph[Long, Long, Long] = graph.joinWithVertices(graph.getVertices.map(new
         VertexToTuple2Map[Long, Long]), new AddValuesMapper)
-    result.getVerticesAsTuple2().writeAsCsv(resultPath)
-    env.execute
+    val res = result.getVertices.collect.toList
     expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -77,9 +58,9 @@ MultipleProgramsTestBase(mode) {
     val tupleSet = graph.getVertices.map(new VertexToTuple2Map[Long, Long])
     val result: Graph[Long, Long, Long] = graph.joinWithVertices[Long](tupleSet,
       (originalvalue: Long, tuplevalue: Long) => originalvalue + tuplevalue)
-    result.getVerticesAsTuple2().writeAsCsv(resultPath)
-    env.execute
+    val res = result.getVertices.collect.toList
     expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
index 7e5ad14..0fa8d2b 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.scala.test.operations
 
-
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.scala._
 import org.apache.flink.graph.Edge
@@ -29,42 +28,21 @@ import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class MapEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
 MultipleProgramsTestBase(mode) {
 
-  private var resultPath: String = null
   private var expectedResult: String = null
 
-  var tempFolder: TemporaryFolder = new TemporaryFolder()
-
-  @Rule
-  def getFolder(): TemporaryFolder = {
-    tempFolder;
-  }
-
-  @Before
-  @throws(classOf[Exception])
-  def before {
-    resultPath = tempFolder.newFile.toURI.toString
-  }
-
-  @After
-  @throws(classOf[Exception])
-  def after {
-    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-
   @Test
   @throws(classOf[Exception])
   def testWithSameValue {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.mapEdges(new AddOneMapper)
-      .getEdgesAsTuple3().writeAsCsv(resultPath)
-    env.execute
+    val res = graph.mapEdges(new AddOneMapper).getEdges.collect.toList
     expectedResult = "1,2,13\n" +
       "1,3,14\n" + "" +
       "2,3,24\n" +
@@ -72,6 +50,7 @@ MultipleProgramsTestBase(mode) {
       "3,5,36\n" +
       "4,5,46\n" +
       "5,1,52\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -80,9 +59,8 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.mapEdges(edge => edge.getValue + 1)
-      .getEdgesAsTuple3().writeAsCsv(resultPath)
-    env.execute
+    val res = graph.mapEdges(edge => edge.getValue + 1)
+      .getEdges.collect.toList
     expectedResult = "1,2,13\n" +
       "1,3,14\n" + "" +
       "2,3,24\n" +
@@ -90,6 +68,7 @@ MultipleProgramsTestBase(mode) {
       "3,5,36\n" +
       "4,5,46\n" +
       "5,1,52\n"
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   final class AddOneMapper extends MapFunction[Edge[Long, Long], Long] {

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
index a22cfbd..c1ab3ea 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
@@ -28,48 +28,27 @@ import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class MapVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
 MultipleProgramsTestBase(mode) {
 
-  private var resultPath: String = null
   private var expectedResult: String = null
 
-  var tempFolder: TemporaryFolder = new TemporaryFolder()
-
-  @Rule
-  def getFolder(): TemporaryFolder = {
-    tempFolder;
-  }
-
-  @Before
-  @throws(classOf[Exception])
-  def before {
-    resultPath = tempFolder.newFile.toURI.toString
-  }
-
-  @After
-  @throws(classOf[Exception])
-  def after {
-    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-
   @Test
   @throws(classOf[Exception])
   def testWithSameValue {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.mapVertices(new AddOneMapper)
-      .getVerticesAsTuple2().writeAsCsv(resultPath)
-    env.execute
-
+    val res = graph.mapVertices(new AddOneMapper).getVertices.collect.toList
     expectedResult = "1,2\n" +
       "2,3\n" +
       "3,4\n" +
       "4,5\n" +
       "5,6\n";
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   @Test
@@ -78,15 +57,13 @@ MultipleProgramsTestBase(mode) {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.mapVertices(vertex => vertex.getValue + 1)
-      .getVerticesAsTuple2().writeAsCsv(resultPath)
-    env.execute
-
+    val res = graph.mapVertices(vertex => vertex.getValue + 1).getVertices.collect.toList
     expectedResult = "1,2\n" +
       "2,3\n" +
       "3,4\n" +
       "4,5\n" +
       "5,6\n";
+    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
   final class AddOneMapper extends MapFunction[Vertex[Long, Long], Long] {

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
index 6ed383a..695f74a 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
@@ -28,46 +28,24 @@ import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode)
   extends MultipleProgramsTestBase(mode) {
 
-  private var resultPath: String = null
   private var expectedResult: String = null
 
-  var tempFolder: TemporaryFolder = new TemporaryFolder()
-
-  @Rule
-  def getFolder(): TemporaryFolder = {
-    tempFolder;
-  }
-
-  @Before
-  @throws(classOf[Exception])
-  def before {
-    resultPath = tempFolder.newFile.toURI.toString
-  }
-
-  @After
-  @throws(classOf[Exception])
-  def after {
-    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-
   @Test
   @throws(classOf[Exception])
   def testAllNeighborsWithValueGreaterThanFour {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val result = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour,
-      EdgeDirection.ALL)
-    result.writeAsCsv(resultPath)
-    env.execute
-
-
-    expectedResult = "5,1\n" + "5,3\n" + "5,4"
+    val res = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour,
+      EdgeDirection.ALL).collect.toList
+    expectedResult = "(5,1)\n" + "(5,3)\n" + "(5,4)"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
 
@@ -77,13 +55,12 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val result = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL)
-    result.writeAsCsv(resultPath)
-    env.execute
-
-
-    expectedResult = "1,2\n" + "1,3\n" + "1,5\n" + "2,1\n" + "2,3\n" + "3,1\n" + "3,2\n" +
-      "3,4\n" + "3,5\n" + "4,3\n" + "4,5\n" + "5,1\n" + "5,3\n" + "5,4"
+    val res = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL)
+    .collect.toList
+    expectedResult = "(1,2)\n" + "(1,3)\n" + "(1,5)\n" + "(2,1)\n" + "(2,3)\n" +
+      "(3,1)\n" + "(3,2)\n" + "(3,4)\n" + "(3,5)\n" + "(4,3)\n" + "(4,5)\n" +
+      "(5,1)\n" + "(5,3)\n" + "(5,4)"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   @Test
@@ -94,9 +71,9 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new
         SelectMinWeightNeighborNoValue, EdgeDirection.OUT)
-    verticesWithLowestOutNeighbor.writeAsCsv(resultPath)
-    env.execute
-    expectedResult = "1,12\n" + "2,23\n" + "3,34\n" + "4,45\n" + "5,51\n"
+    val res = verticesWithLowestOutNeighbor.collect.toList
+    expectedResult = "(1,12)\n" + "(2,23)\n" + "(3,34)\n" + "(4,45)\n" + "(5,51)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   @Test
@@ -107,9 +84,9 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new
         SelectMinWeightNeighborNoValue, EdgeDirection.IN)
-    verticesWithLowestOutNeighbor.writeAsCsv(resultPath)
-    env.execute
-    expectedResult = "1,51\n" + "2,12\n" + "3,13\n" + "4,34\n" + "5,35\n"
+    val res = verticesWithLowestOutNeighbor.collect.toList
+    expectedResult = "(1,51)\n" + "(2,12)\n" + "(3,13)\n" + "(4,34)\n" + "(5,35)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   @Test
@@ -120,9 +97,9 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val verticesWithMaxEdgeWeight: DataSet[(Long, Long)] = graph.reduceOnEdges(new
         SelectMaxWeightNeighborNoValue, EdgeDirection.ALL)
-    verticesWithMaxEdgeWeight.writeAsCsv(resultPath)
-    env.execute
-    expectedResult = "1,51\n" + "2,23\n" + "3,35\n" + "4,45\n" + "5,51\n"
+    val res = verticesWithMaxEdgeWeight.collect.toList
+    expectedResult = "(1,51)\n" + "(2,23)\n" + "(3,35)\n" + "(4,45)\n" + "(5,51)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   final class SelectNeighborsValueGreaterThanFour extends EdgesFunctionWithVertexValue[Long,

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0284ef/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
index 52e6d7a..b01e750 100644
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
@@ -28,42 +28,24 @@ import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{After, Before, Rule, Test}
+import _root_.scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode)
   extends MultipleProgramsTestBase(mode) {
 
-  private var resultPath: String = null
   private var expectedResult: String = null
 
-  var tempFolder: TemporaryFolder = new TemporaryFolder()
-
-  @Rule
-  def getFolder(): TemporaryFolder = {
-    tempFolder;
-  }
-
-  @Before
-  @throws(classOf[Exception])
-  def before {
-    resultPath = tempFolder.newFile.toURI.toString
-  }
-
-  @After
-  @throws(classOf[Exception])
-  def after {
-    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-
   @Test
   @throws(classOf[Exception])
   def testSumOfAllNeighborsNoValue {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL).writeAsCsv(resultPath)
-    env.execute
-    expectedResult = "1,10\n" + "2,4\n" + "3,12\n" + "4,8\n" + "5,8\n"
+    val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL)
+    .collect.toList
+    expectedResult = "(1,10)\n" + "(2,4)\n" + "(3,12)\n" + "(4,8)\n" + "(5,8)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   @Test
@@ -72,9 +54,9 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).writeAsCsv(resultPath)
-    env.execute
-    expectedResult = "1,5\n" + "2,3\n" + "3,9\n" + "4,5\n" + "5,1\n"
+    val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).collect.toList
+    expectedResult = "(1,5)\n" + "(2,3)\n" + "(3,9)\n" + "(4,5)\n" + "(5,1)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   @Test
@@ -84,9 +66,9 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution
     val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val result = graph.groupReduceOnNeighbors(new SumAllNeighbors, EdgeDirection.ALL)
-    result.writeAsCsv(resultPath)
-    env.execute
-    expectedResult = "1,11\n" + "2,6\n" + "3,15\n" + "4,12\n" + "5,13\n"
+    val res = result.collect.toList
+    expectedResult = "(1,11)\n" + "(2,6)\n" + "(3,15)\n" + "(4,12)\n" + "(5,13)\n"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   @Test
@@ -97,9 +79,9 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution
       .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
     val result = graph.groupReduceOnNeighbors(new
         SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo, EdgeDirection.IN)
-    result.writeAsCsv(resultPath)
-    env.execute
-    expectedResult = "3,59\n" + "3,118\n" + "4,204\n" + "4,102\n" + "5,570\n" + "5,285"
+    val res = result.collect.toList
+    expectedResult = "(3,59)\n" + "(3,118)\n" + "(4,204)\n" + "(4,102)\n" + "(5,570)\n" + "(5,285)"
+    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
   }
 
   final class SumNeighbors extends ReduceNeighborsFunction[Long] {