You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/03/04 21:33:09 UTC

[7/7] flink git commit: [gelly] refactored tests; removed duplicate data from TestGraphUtils

[gelly] refactored tests; removed duplicate data from TestGraphUtils


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1e03062
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1e03062
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1e03062

Branch: refs/heads/master
Commit: e1e03062ccab7db0534a866fa5a984095e2b5eef
Parents: b529b62
Author: vasia <va...@gmail.com>
Authored: Wed Mar 4 15:49:00 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Wed Mar 4 21:31:56 2015 +0100

----------------------------------------------------------------------
 .../graph/example/LabelPropagationExample.java  |   9 +-
 .../SingleSourceShortestPathsExample.java       |   4 +-
 .../apache/flink/graph/test/DegreesITCase.java  | 171 ------
 .../flink/graph/test/FromCollectionITCase.java  | 120 -----
 .../flink/graph/test/GraphCreationITCase.java   | 170 ------
 .../test/GraphCreationWithMapperITCase.java     | 158 ------
 .../flink/graph/test/GraphMutationsITCase.java  | 273 ----------
 .../flink/graph/test/GraphOperationsITCase.java | 267 ----------
 .../flink/graph/test/JoinWithEdgesITCase.java   | 519 ------------------
 .../graph/test/JoinWithVerticesITCase.java      | 218 --------
 .../test/LabelPropagationExampleITCase.java     | 176 -------
 .../apache/flink/graph/test/MapEdgesITCase.java | 223 --------
 .../flink/graph/test/MapVerticesITCase.java     | 233 ---------
 .../graph/test/ReduceOnEdgesMethodsITCase.java  | 317 -----------
 .../test/ReduceOnNeighborMethodsITCase.java     | 303 -----------
 .../apache/flink/graph/test/TestGraphUtils.java |  62 ++-
 .../example/LabelPropagationExampleITCase.java  | 143 +++++
 .../graph/test/operations/DegreesITCase.java    | 172 ++++++
 .../test/operations/FromCollectionITCase.java   | 121 +++++
 .../test/operations/GraphCreationITCase.java    | 171 ++++++
 .../GraphCreationWithMapperITCase.java          | 159 ++++++
 .../test/operations/GraphMutationsITCase.java   | 274 ++++++++++
 .../test/operations/GraphOperationsITCase.java  | 268 ++++++++++
 .../test/operations/JoinWithEdgesITCase.java    | 520 +++++++++++++++++++
 .../test/operations/JoinWithVerticesITCase.java | 219 ++++++++
 .../graph/test/operations/MapEdgesITCase.java   | 224 ++++++++
 .../test/operations/MapVerticesITCase.java      | 234 +++++++++
 .../operations/ReduceOnEdgesMethodsITCase.java  | 318 ++++++++++++
 .../ReduceOnNeighborMethodsITCase.java          | 304 +++++++++++
 29 files changed, 3174 insertions(+), 3176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
index 78cb5d5..e399b3f 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
@@ -85,8 +85,8 @@ public class LabelPropagationExample implements ProgramDescription {
 	private static boolean parseParameters(String[] args) {
 
 		if(args.length > 0) {
-			if(args.length != 5) {
-				System.err.println("Usage: LabelPropagation <vertex path> <edge path> <output path> <num vertices> <num iterations>");
+			if(args.length != 4) {
+				System.err.println("Usage: LabelPropagation <vertex path> <edge path> <output path> <num iterations>");
 				return false;
 			}
 
@@ -94,13 +94,12 @@ public class LabelPropagationExample implements ProgramDescription {
 			vertexInputPath = args[0];
 			edgeInputPath = args[1];
 			outputPath = args[2];
-			numVertices = Integer.parseInt(args[3]);
-			maxIterations = Integer.parseInt(args[4]);
+			maxIterations = Integer.parseInt(args[3]);
 		} else {
 			System.out.println("Executing LabelPropagation example with default parameters and built-in default data.");
 			System.out.println("  Provide parameters to read input data from files.");
 			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: LabelPropagation <vertex path> <edge path> <output path> <num vertices> <num iterations>");
+			System.out.println("  Usage: LabelPropagation <vertex path> <edge path> <output path> <num iterations>");
 		}
 		return true;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
index c590f30..6c85397 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
@@ -100,6 +100,7 @@ public class SingleSourceShortestPathsExample implements ProgramDescription {
 		return true;
 	}
 
+	@SuppressWarnings("serial")
 	private static DataSet<Vertex<Long, Double>> getVerticesDataSet(ExecutionEnvironment env) {
 		if (fileOutput) {
 			return env.readCsvFile(verticesInputPath)
@@ -119,6 +120,7 @@ public class SingleSourceShortestPathsExample implements ProgramDescription {
 		}
 	}
 
+	@SuppressWarnings("serial")
 	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
 		if (fileOutput) {
 			return env.readCsvFile(edgesInputPath)
@@ -128,7 +130,7 @@ public class SingleSourceShortestPathsExample implements ProgramDescription {
 
 						@Override
 						public Edge<Long, Double> map(Tuple3<Long, Long, Double> tuple3) throws Exception {
-							return new Edge(tuple3.f0, tuple3.f1, tuple3.f2);
+							return new Edge<Long, Double>(tuple3.f0, tuple3.f1, tuple3.f2);
 						}
 					});
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java
deleted file mode 100644
index 96a6d20..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class DegreesITCase extends MultipleProgramsTestBase {
-
-	public DegreesITCase(MultipleProgramsTestBase.ExecutionMode mode){
-		super(mode);
-	}
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Test
-	public void testOutDegrees() throws Exception {
-		/*
-		* Test outDegrees()
-		*/
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        graph.outDegrees().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2\n" +
-                    "2,1\n" +
-                    "3,2\n" +
-                    "4,1\n" +
-                    "5,1\n";
-    }
-
-	@Test
-	public void testOutDegreesWithNoOutEdges() throws Exception {
-		/*
-		 * Test outDegrees() no outgoing edges
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
-
-        graph.outDegrees().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,3\n" +
-                "2,1\n" +
-                "3,1\n" +
-                "4,1\n" +
-                "5,0\n";
-    }
-
-	@Test
-	public void testInDegrees() throws Exception {
-		/*
-		 * Test inDegrees()
-		 */
-	    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-	    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-	            TestGraphUtils.getLongLongEdgeData(env), env);
-
-	    graph.inDegrees().writeAsCsv(resultPath);
-	    env.execute();
-	    expectedResult = "1,1\n" +
-		            "2,1\n" +
-		            "3,2\n" +
-		            "4,1\n" +
-		            "5,2\n";
-    }
-
-	@Test
-	public void testInDegreesWithNoInEdge() throws Exception {
-		/*
-		 * Test inDegrees() no ingoing edge
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
-
-        graph.inDegrees().writeAsCsv(resultPath);
-        env.execute();
-        expectedResult = "1,0\n" +
-	                "2,1\n" +
-	                "3,1\n" +
-	                "4,1\n" +
-	                "5,3\n";
-    }
-
-	@Test
-	public void testGetDegrees() throws Exception {
-		/*
-		 * Test getDegrees()
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        graph.getDegrees().writeAsCsv(resultPath);
-        env.execute();
-        expectedResult = "1,3\n" +
-	                "2,2\n" +
-	                "3,4\n" +
-	                "4,2\n" +
-	                "5,3\n";
-    }
-
-	@Test
-	public void testGetDegreesWithDisconnectedData() throws Exception {
-        /*
-		 * Test getDegrees() with disconnected data
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, NullValue, Long> graph =
-                Graph.fromDataSet(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
-
-        graph.outDegrees().writeAsCsv(resultPath);
-        env.execute();
-        expectedResult = "1,2\n" +
-                "2,1\n" +
-                "3,0\n" +
-                "4,1\n" +
-                "5,0\n";
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java
deleted file mode 100644
index 5259143..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class FromCollectionITCase extends MultipleProgramsTestBase {
-
-	public FromCollectionITCase(MultipleProgramsTestBase.ExecutionMode mode){
-		super(mode);
-	}
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Test
-	public void testFromCollectionVerticesEdges() throws Exception {
-		/*
-		 * Test fromCollection(vertices, edges):
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-        Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
-                TestGraphUtils.getLongLongEdges(), env);
-
-        graph.getEdges().writeAsCsv(resultPath);
-        env.execute();
-        expectedResult = "1,2,12\n" +
-	                "1,3,13\n" +
-	                "2,3,23\n" +
-	                "3,4,34\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-    }
-
-	@Test
-	public void testFromCollectionEdgesNoInitialValue() throws Exception {
-        /*
-         * Test fromCollection(edges) with no initial value for the vertices
-         */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-        Graph<Long, NullValue, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(),
-        		env);
-
-        graph.getVertices().writeAsCsv(resultPath);
-        env.execute();
-        expectedResult = "1,(null)\n" +
-	                "2,(null)\n" +
-	                "3,(null)\n" +
-	                "4,(null)\n" +
-	                "5,(null)\n";
-    }
-
-	@Test
-	public void testFromCollectionEdgesWithInitialValue() throws Exception {
-        /*
-         * Test fromCollection(edges) with vertices initialised by a
-         * function that takes the id and doubles it
-         */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(),
-                new InitVerticesMapper(), env);
-
-        graph.getVertices().writeAsCsv(resultPath);
-        env.execute();
-        expectedResult = "1,2\n" +
-	                "2,4\n" +
-	                "3,6\n" +
-	                "4,8\n" +
-	                "5,10\n";
-    }
-
-	@SuppressWarnings("serial")
-	private static final class InitVerticesMapper implements MapFunction<Long, Long> {
-        public Long map(Long vertexId) {
-            return vertexId * 2;
-        }
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java
deleted file mode 100644
index 4cbdd90..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.validation.InvalidVertexIdsValidator;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class GraphCreationITCase extends MultipleProgramsTestBase {
-
-	public GraphCreationITCase(MultipleProgramsTestBase.ExecutionMode mode){
-		super(mode);
-	}
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Test
-	public void testCreateWithoutVertexValues() throws Exception {
-	/*
-	 * Test create() with edge dataset and no vertex values
-     */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, NullValue, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env);
-
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
-		expectedResult = "1,(null)\n" +
-					"2,(null)\n" +
-					"3,(null)\n" +
-					"4,(null)\n" +
-					"5,(null)\n";
-	}
-
-	@Test
-	public void testCreateWithMapper() throws Exception {
-	/*
-	 * Test create() with edge dataset and a mapper that assigns the id as value
-     */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
-				new AssignIdAsValueMapper(), env);
-
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
-		expectedResult = "1,1\n" +
-					"2,2\n" +
-					"3,3\n" +
-					"4,4\n" +
-					"5,5\n";
-	}
-
-	@Test
-	public void testCreateWithCustomVertexValue() throws Exception {
-		/*
-		 * Test create() with edge dataset and a mapper that assigns a parametrized custom vertex value
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, DummyCustomParameterizedType<Double>, Long> graph = Graph.fromDataSet(
-				TestGraphUtils.getLongLongEdgeData(env), new AssignCustomVertexValueMapper(), env);
-
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
-		expectedResult = "1,(2.0,0)\n" +
-				"2,(4.0,1)\n" +
-				"3,(6.0,2)\n" +
-				"4,(8.0,3)\n" +
-				"5,(10.0,4)\n";
-	}
-
-	@Test
-	public void testValidate() throws Exception {
-		/*
-		 * Test validate():
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongVertexData(env);
-		DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
-		DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
-
-		result.writeAsText(resultPath);
-		env.execute();
-
-		expectedResult = "true\n";
-	}
-
-	@Test
-	public void testValidateWithInvalidIds() throws Exception {
-		/*
-		 * Test validate() - invalid vertex ids
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongInvalidVertexData(env);
-		DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
-		DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
-		result.writeAsText(resultPath);
-		env.execute();
-
-		expectedResult = "false\n";
-	}
-
-	@SuppressWarnings("serial")
-	private static final class AssignIdAsValueMapper implements MapFunction<Long, Long> {
-		public Long map(Long vertexId) {
-			return vertexId;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class AssignCustomVertexValueMapper implements
-		MapFunction<Long, DummyCustomParameterizedType<Double>> {
-
-		DummyCustomParameterizedType<Double> dummyValue =
-				new DummyCustomParameterizedType<Double>();
-
-		public DummyCustomParameterizedType<Double> map(Long vertexId) {
-			dummyValue.setIntField(vertexId.intValue()-1);
-			dummyValue.setTField(vertexId*2.0);
-			return dummyValue;
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java
deleted file mode 100644
index 24f7c82..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class GraphCreationWithMapperITCase extends MultipleProgramsTestBase {
-
-	public GraphCreationWithMapperITCase(MultipleProgramsTestBase.ExecutionMode mode){
-		super(mode);
-	}
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Test
-	public void testWithDoubleValueMapper() throws Exception {
-		/*
-		 * Test create() with edge dataset and a mapper that assigns a double constant as value
-	     */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
-				new AssignDoubleValueMapper(), env);
-
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
-		expectedResult = "1,0.1\n" +
-				"2,0.1\n" +
-				"3,0.1\n" +
-				"4,0.1\n" +
-				"5,0.1\n";
-	}
-
-	@Test
-	public void testWithTuple2ValueMapper() throws Exception {
-		/*
-		 * Test create() with edge dataset and a mapper that assigns a Tuple2 as value
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Tuple2<Long, Long>, Long> graph = Graph.fromDataSet(
-				TestGraphUtils.getLongLongEdgeData(env), new AssignTuple2ValueMapper(), env);
-
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
-		expectedResult = "1,(2,42)\n" +
-				"2,(4,42)\n" +
-				"3,(6,42)\n" +
-				"4,(8,42)\n" +
-				"5,(10,42)\n";
-	}
-
-	@Test
-	public void testWithConstantValueMapper() throws Exception {
-	/*
-	 * Test create() with edge dataset with String key type
-	 * and a mapper that assigns a double constant as value
-	 */
-	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-	Graph<String, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getStringLongEdgeData(env),
-			new AssignDoubleConstantMapper(), env);
-
-	graph.getVertices().writeAsCsv(resultPath);
-	env.execute();
-	expectedResult = "1,0.1\n" +
-			"2,0.1\n" +
-			"3,0.1\n" +
-			"4,0.1\n" +
-			"5,0.1\n";
-	}
-
-	@Test
-	public void testWithDCustomValueMapper() throws Exception {
-		/*
-		 * Test create() with edge dataset and a mapper that assigns a custom vertex value
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, DummyCustomType, Long> graph = Graph.fromDataSet(
-				TestGraphUtils.getLongLongEdgeData(env), new AssignCustomValueMapper(), env);
-
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
-		expectedResult = "1,(F,0)\n" +
-				"2,(F,1)\n" +
-				"3,(F,2)\n" +
-				"4,(F,3)\n" +
-				"5,(F,4)\n";
-	}
-
-	@SuppressWarnings("serial")
-	private static final class AssignDoubleValueMapper implements MapFunction<Long, Double> {
-		public Double map(Long value) {
-			return 0.1d;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class AssignTuple2ValueMapper implements MapFunction<Long, Tuple2<Long, Long>> {
-		public Tuple2<Long, Long> map(Long vertexId) {
-			return new Tuple2<Long, Long>(vertexId*2, 42l);
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class AssignDoubleConstantMapper implements MapFunction<String, Double> {
-		public Double map(String value) {
-			return 0.1d;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class AssignCustomValueMapper implements MapFunction<Long, DummyCustomType> {
-		public DummyCustomType map(Long vertexId) {
-			return new DummyCustomType(vertexId.intValue()-1, false);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java
deleted file mode 100644
index 3af8943..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class GraphMutationsITCase extends MultipleProgramsTestBase {
-
-	public GraphMutationsITCase(MultipleProgramsTestBase.ExecutionMode mode){
-		super(mode);
-	}
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Test
-	public void testAddVertex() throws Exception {
-		/*
-		 * Test addVertex() -- simple case
-		 */	
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
-		edges.add(new Edge<Long, Long>(6L, 1L, 61L));
-		graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges);
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,2,12\n" +
-				"1,3,13\n" +
-				"2,3,23\n" +
-				"3,4,34\n" +
-				"3,5,35\n" +
-				"4,5,45\n" +
-				"5,1,51\n" +
-				"6,1,61\n";	
-	}
-
-	@Test
-	public void testAddVertexExisting() throws Exception {
-		/*
-		 * Test addVertex() -- add an existing vertex
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
-		edges.add(new Edge<Long, Long>(1L, 5L, 15L));
-		graph = graph.addVertex(new Vertex<Long, Long>(1L, 1L), edges);
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,2,12\n" +
-					"1,3,13\n" +
-					"1,5,15\n" +
-					"2,3,23\n" +
-					"3,4,34\n" +
-					"3,5,35\n" +
-					"4,5,45\n" +
-					"5,1,51\n";
-	}
-
-	@Test
-	public void testAddVertexNoEdges() throws Exception {
-		/*
-		 * Test addVertex() -- add vertex with empty edge set
-		 */	
-		
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
-		graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges);
-		graph.getVertices().writeAsCsv(resultPath);
-		env.execute();
-	
-		expectedResult = "1,1\n" +
-			"2,2\n" +
-			"3,3\n" +
-			"4,4\n" +
-			"5,5\n" +
-			"6,6\n";
-	}
-
-	@Test
-	public void testRemoveVertex() throws Exception {
-		/*
-		 * Test removeVertex() -- simple case
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph = graph.removeVertex(new Vertex<Long, Long>(5L, 5L));
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,2,12\n" +
-				"1,3,13\n" +
-				"2,3,23\n" +
-				"3,4,34\n";
-	}
-
-	@Test
-	public void testRemoveInvalidVertex() throws Exception {
-		/*
-		 * Test removeVertex() -- remove an invalid vertex
-		 */	
-		
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph = graph.removeVertex(new Vertex<Long, Long>(6L, 6L));
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,2,12\n" +
-				"1,3,13\n" +
-				"2,3,23\n" +
-				"3,4,34\n" +
-				"3,5,35\n" +
-				"4,5,45\n" +
-				"5,1,51\n";
-	}
-	
-	@Test
-	public void testAddEdge() throws Exception {
-		/*
-		 * Test addEdge() -- simple case
-		 */
-		
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph = graph.addEdge(new Vertex<Long, Long>(6L, 6L), new Vertex<Long, Long>(1L, 1L),
-				61L);
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,2,12\n" +
-				"1,3,13\n" +
-				"2,3,23\n" +
-				"3,4,34\n" +
-				"3,5,35\n" +
-				"4,5,45\n" +
-				"5,1,51\n" +
-				"6,1,61\n";	
-	}
-	
-	@Test
-	public void testAddExistingEdge() throws Exception {
-		/*
-		 * Test addEdge() -- add already existing edge
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph = graph.addEdge(new Vertex<Long, Long>(1L, 1L), new Vertex<Long, Long>(2L, 2L),
-				12L);
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,2,12\n" +
-				"1,2,12\n" +
-				"1,3,13\n" +
-				"2,3,23\n" +
-				"3,4,34\n" +
-				"3,5,35\n" +
-				"4,5,45\n" +
-				"5,1,51\n";	
-	}
-	
-	@Test
-	public void testRemoveVEdge() throws Exception {
-		/*
-		 * Test removeEdge() -- simple case
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph = graph.removeEdge(new Edge<Long, Long>(5L, 1L, 51L));
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,2,12\n" +
-				"1,3,13\n" +
-				"2,3,23\n" +
-				"3,4,34\n" +
-				"3,5,35\n" +
-				"4,5,45\n";
-	}
-	
-	@Test
-	public void testRemoveInvalidEdge() throws Exception {
-		/*
-		 * Test removeEdge() -- invalid edge
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph = graph.removeEdge(new Edge<Long, Long>(6L, 1L, 61L));
-		graph.getEdges().writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,2,12\n" +
-				"1,3,13\n" +
-				"2,3,23\n" +
-				"3,4,34\n" +
-				"3,5,35\n" +
-				"4,5,45\n" +
-				"5,1,51\n";
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java
deleted file mode 100644
index f194a60..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class GraphOperationsITCase extends MultipleProgramsTestBase {
-
-	public GraphOperationsITCase(MultipleProgramsTestBase.ExecutionMode mode){
-		super(mode);
-	}
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Test
-	public void testUndirected() throws Exception {
-		/*
-		 * Test getUndirected()
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		graph.getUndirected().getEdges().writeAsCsv(resultPath);
-		env.execute();
-		expectedResult = "1,2,12\n" + "2,1,12\n" +
-					"1,3,13\n" + "3,1,13\n" +
-					"2,3,23\n" + "3,2,23\n" +
-					"3,4,34\n" + "4,3,34\n" +
-					"3,5,35\n" + "5,3,35\n" +
-					"4,5,45\n" + "5,4,45\n" +
-					"5,1,51\n" + "1,5,51\n";
-	}
-
-	@Test
-	public void testReverse() throws Exception {
-		/*
-		 * Test reverse()
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		graph.reverse().getEdges().writeAsCsv(resultPath);
-		env.execute();
-		expectedResult = "2,1,12\n" +
-					"3,1,13\n" +
-					"3,2,23\n" +
-					"4,3,34\n" +
-					"5,3,35\n" +
-					"5,4,45\n" +
-					"1,5,51\n";
-	}
-
-	@SuppressWarnings("serial")
-	@Test
-	public void testSubGraph() throws Exception {
-		/*
-		 * Test subgraph:
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph.subgraph(new FilterFunction<Vertex<Long, Long>>() {
-						   public boolean filter(Vertex<Long, Long> vertex) throws Exception {
-							   return (vertex.getValue() > 2);
-						   }
-					   },
-				new FilterFunction<Edge<Long, Long>>() {
-					public boolean filter(Edge<Long, Long> edge) throws Exception {
-						return (edge.getValue() > 34);
-					}
-				}).getEdges().writeAsCsv(resultPath);
-
-		env.execute();
-		expectedResult = "3,5,35\n" +
-					"4,5,45\n";
-	}
-
-	@SuppressWarnings("serial")
-	@Test
-	public void testFilterVertices() throws Exception {
-		/*
-		 * Test filterOnVertices:
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() {
-			public boolean filter(Vertex<Long, Long> vertex) throws Exception {
-				return (vertex.getValue() > 2);
-			}
-		}).getEdges().writeAsCsv(resultPath);
-
-		env.execute();
-		expectedResult =  "3,4,34\n" +
-				"3,5,35\n" +
-				"4,5,45\n";
-	}
-
-	@SuppressWarnings("serial")
-	@Test
-	public void testFilterEdges() throws Exception {
-		/*
-		 * Test filterOnEdges:
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph.filterOnEdges(new FilterFunction<Edge<Long, Long>>() {
-			public boolean filter(Edge<Long, Long> edge) throws Exception {
-				return (edge.getValue() > 34);
-			}
-		}).getEdges().writeAsCsv(resultPath);
-
-		env.execute();
-		expectedResult = "3,5,35\n" +
-					"4,5,45\n" +
-					"5,1,51\n";
-	}
-
-	@Test
-	public void testNumberOfVertices() throws Exception {
-		/*
-		 * Test numberOfVertices()
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph.numberOfVertices().writeAsText(resultPath);
-
-		env.execute();
-		expectedResult = "5";
-	}
-
-	@Test
-	public void testNumberOfEdges() throws Exception {
-		/*
-		 * Test numberOfEdges()
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph.numberOfEdges().writeAsText(resultPath);
-
-		env.execute();
-		expectedResult = "7";
-	}
-
-	@Test
-	public void testVertexIds() throws Exception {
-		/*
-		 * Test getVertexIds()
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph.getVertexIds().writeAsText(resultPath);
-
-		env.execute();
-		expectedResult = "1\n2\n3\n4\n5\n";
-	}
-
-	@Test
-	public void testEdgesIds() throws Exception {
-		/*
-		 * Test getEdgeIds()
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph.getEdgeIds().writeAsCsv(resultPath);
-
-		env.execute();
-		expectedResult = "1,2\n" + "1,3\n" +
-				"2,3\n" + "3,4\n" +
-				"3,5\n" + "4,5\n" +
-				"5,1\n";
-	}
-
-	@Test
-	public void testUnion() throws Exception {
-		/*
-		 * Test union()
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
-		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
-
-		vertices.add(new Vertex<Long, Long>(6L, 6L));
-		edges.add(new Edge<Long, Long>(6L, 1L, 61L));
-
-		graph = graph.union(Graph.fromCollection(vertices, edges, env));
-
-		graph.getEdges().writeAsCsv(resultPath);
-
-		env.execute();
-
-		expectedResult = "1,2,12\n" +
-					"1,3,13\n" +
-					"2,3,23\n" +
-					"3,4,34\n" +
-					"3,5,35\n" +
-					"4,5,45\n" +
-					"5,1,51\n" +
-					"6,1,61\n";
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java
deleted file mode 100644
index 6f4f6a8..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java
+++ /dev/null
@@ -1,519 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.utils.EdgeToTuple3Map;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
-
-	public JoinWithEdgesITCase(MultipleProgramsTestBase.ExecutionMode mode){
-		super(mode);
-	}
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Test
-	public void testWithEdgesInputDataset() throws Exception {
-		/*
-		 * Test joinWithEdges with the input DataSet parameter identical
-		 * to the edge DataSet
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges()
-                        .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,26\n" +
-	                "2,3,46\n" +
-	                "3,4,68\n" +
-	                "3,5,70\n" +
-	                "4,5,90\n" +
-	                "5,1,102\n";
-    }
-
-	@Test
-	public void testWithLessElements() throws Exception {
-	    /*
-		 * Test joinWithEdges with the input DataSet passed as a parameter containing
-		 * less elements than the edge DataSet, but of the same type
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3)
-                        .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,26\n" +
-	                "2,3,46\n" +
-	                "3,4,34\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-    }
-
-	@Test
-	public void testWithLessElementsDifferentType() throws Exception {
-	    /*
-		 * Test joinWithEdges with the input DataSet passed as a parameter containing
-		 * less elements than the edge DataSet and of a different type(Boolean)
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3)
-                        .map(new BooleanEdgeValueMapper()), new DoubleIfTrueMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,26\n" +
-	                "2,3,46\n" +
-	                "3,4,34\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-    }
-
-	@Test
-	public void testWithNoCommonKeys() throws Exception {
-	    /*
-		 * Test joinWithEdges with the input DataSet containing different keys than the edge DataSet
-		 * - the iterator becomes empty.
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env),
-                new DoubleValueMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,26\n" +
-	                "2,3,46\n" +
-	                "3,4,68\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-    }
-
-	@Test
-	public void testWithCustomType() throws Exception {
-	    /*
-	     * Test joinWithEdges with a DataSet containing custom parametrised type input values
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env),
-                new CustomValueMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,10\n" +
-	                "1,3,20\n" +
-	                "2,3,30\n" +
-	                "3,4,40\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-    }
-
-	@Test
-	public void testWithEdgesOnSource() throws Exception {
-	    /*
-		 * Test joinWithEdgesOnSource with the input DataSet parameter identical
-		 * to the edge DataSet
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges()
-                        .map(new ProjectSourceAndValueMapper()), new AddValuesMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,25\n" +
-	                "2,3,46\n" +
-	                "3,4,68\n" +
-	                "3,5,69\n" +
-	                "4,5,90\n" +
-	                "5,1,102\n";
-    }
-
-	@Test
-	public void testOnSourceWithLessElements() throws Exception {
-	    /*
-		 * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing
-		 * less elements than the edge DataSet, but of the same type
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3)
-                        .map(new ProjectSourceAndValueMapper()), new AddValuesMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,25\n" +
-	                "2,3,46\n" +
-	                "3,4,34\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-    }
-
-	@Test
-	public void testOnSourceWithDifferentType() throws Exception {
-	    /*
-		 * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing
-		 * less elements than the edge DataSet and of a different type(Boolean)
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3)
-                        .map(new ProjectSourceWithTrueMapper()), new DoubleIfTrueMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,26\n" +
-	                "2,3,46\n" +
-	                "3,4,34\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-    }
-
-	@Test
-	public void testOnSourceWithNoCommonKeys() throws Exception {
-	    /*
-		 * Test joinWithEdgesOnSource with the input DataSet containing different keys than the edge DataSet
-		 * - the iterator becomes empty.
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env),
-                new DoubleValueMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,20\n" +
-	                "1,3,20\n" +
-	                "2,3,60\n" +
-	                "3,4,80\n" +
-	                "3,5,80\n" +
-	                "4,5,120\n" +
-	                "5,1,51\n";
-    }
-
-	@Test
-	public void testOnSourceWithCustom() throws Exception {
-	    /*
-	     * Test joinWithEdgesOnSource with a DataSet containing custom parametrised type input values
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env),
-                new CustomValueMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,10\n" +
-	                "1,3,10\n" +
-	                "2,3,30\n" +
-	                "3,4,40\n" +
-	                "3,5,40\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-    }
-
-	@Test
-	public void testWithEdgesOnTarget() throws Exception {
-    /*
-	 * Test joinWithEdgesOnTarget with the input DataSet parameter identical
-	 * to the edge DataSet
-	 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges()
-                        .map(new ProjectTargetAndValueMapper()), new AddValuesMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,26\n" +
-	                "2,3,36\n" +
-	                "3,4,68\n" +
-	                "3,5,70\n" +
-	                "4,5,80\n" +
-	                "5,1,102\n";
-    }
-
-	@Test
-	public void testWithOnTargetWithLessElements() throws Exception {
-	    /*
-		 * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing
-		 * less elements than the edge DataSet, but of the same type
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
-                        .map(new ProjectTargetAndValueMapper()), new AddValuesMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,26\n" +
-	                "2,3,36\n" +
-	                "3,4,34\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-    }
-
-	@Test
-	public void testOnTargetWithDifferentType() throws Exception {
-	    /*
-		 * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing
-		 * less elements than the edge DataSet and of a different type(Boolean)
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
-                        .map(new ProjectTargetWithTrueMapper()), new DoubleIfTrueMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,26\n" +
-	                "2,3,46\n" +
-	                "3,4,34\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-    }
-
-	@Test
-	public void testOnTargetWithNoCommonKeys() throws Exception {
-	    /*
-		 * Test joinWithEdgesOnTarget with the input DataSet containing different keys than the edge DataSet
-		 * - the iterator becomes empty.
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env),
-                new DoubleValueMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,20\n" +
-	                "1,3,40\n" +
-	                "2,3,40\n" +
-	                "3,4,80\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,140\n";
-    }
-
-	@Test
-	public void testOnTargetWithCustom() throws Exception {
-	    /*
-	     * Test joinWithEdgesOnTarget with a DataSet containing custom parametrised type input values
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env),
-                new CustomValueMapper());
-
-        result.getEdges().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2,10\n" +
-	                "1,3,20\n" +
-	                "2,3,20\n" +
-	                "3,4,40\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-    }
-
-	@SuppressWarnings("serial")
-	private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> {
-		public Long map(Tuple2<Long, Long> tuple) throws Exception {
-			return tuple.f0 + tuple.f1;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class BooleanEdgeValueMapper implements MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Boolean>> {
-        public Tuple3<Long, Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
-            return new Tuple3<Long, Long, Boolean>(edge.getSource(),
-                    edge.getTarget(), true);
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> {
-        public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
-            if(tuple.f1) {
-                return tuple.f0 * 2;
-            }
-            else {
-                return tuple.f0;
-            }
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class DoubleValueMapper implements MapFunction<Tuple2<Long, Long>, Long> {
-        public Long map(Tuple2<Long, Long> tuple) throws Exception {
-            return tuple.f1 * 2;
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
-        public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception {
-            return (long) tuple.f1.getIntField();
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class ProjectSourceAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
-        public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
-            return new Tuple2<Long, Long>(edge.getSource(), edge.getValue());
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class ProjectSourceWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
-        public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
-            return new Tuple2<Long, Boolean>(edge.getSource(), true);
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class ProjectTargetAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
-        public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
-            return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue());
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class ProjectTargetWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
-        public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
-            return new Tuple2<Long, Boolean>(edge.getTarget(), true);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java
deleted file mode 100644
index 0574265..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.utils.VertexToTuple2Map;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class JoinWithVerticesITCase extends MultipleProgramsTestBase {
-
-	public JoinWithVerticesITCase(MultipleProgramsTestBase.ExecutionMode mode){
-		super(mode);
-	}
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Test
-	public void testJoinWithVertexSet() throws Exception {
-		/*
-		 * Test joinWithVertices with the input DataSet parameter identical
-		 * to the vertex DataSet
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices()
-                        .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper());
-
-        result.getVertices().writeAsCsv(resultPath);
-        env.execute();
-
-       expectedResult = "1,2\n" +
-	                "2,4\n" +
-	                "3,6\n" +
-	                "4,8\n" +
-	                "5,10\n";
-    }
-
-	@Test
-	public void testWithLessElements() throws Exception {
-	/*
-	 * Test joinWithVertices with the input DataSet passed as a parameter containing
-	 * less elements than the vertex DataSet, but of the same type
-	 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3)
-                        .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper());
-
-        result.getVertices().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2\n" +
-	                "2,4\n" +
-	                "3,6\n" +
-	                "4,4\n" +
-	                "5,5\n";
-    }
-
-	@Test
-	public void testWithDifferentType() throws Exception {
-	/*
-	 * Test joinWithVertices with the input DataSet passed as a parameter containing
-	 * less elements than the vertex DataSet and of a different type(Boolean)
-	 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3)
-                        .map(new ProjectIdWithTrue()), new DoubleIfTrueMapper());
-
-        result.getVertices().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,2\n" +
-	                "2,4\n" +
-	                "3,6\n" +
-	                "4,4\n" +
-	                "5,5\n";
-    }
-
-	@Test
-	public void testWithDifferentKeys() throws Exception {
-		/*
-		 * Test joinWithVertices with an input DataSet containing different keys than the vertex DataSet
-		 * - the iterator becomes empty.
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env),
-                new ProjectSecondMapper());
-
-        result.getVertices().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,10\n" +
-	                "2,20\n" +
-	                "3,30\n" +
-	                "4,40\n" +
-	                "5,5\n";
-    }
-
-	@Test
-	public void testWithCustomType() throws Exception {
-		/*
-		 * Test joinWithVertices with a DataSet containing custom parametrised type input values
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env),
-                new CustomValueMapper());
-
-        result.getVertices().writeAsCsv(resultPath);
-        env.execute();
-
-        expectedResult = "1,10\n" +
-	                "2,20\n" +
-	                "3,30\n" +
-	                "4,40\n" +
-	                "5,5\n";
-    }
-
-	@SuppressWarnings("serial")
-	private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> {
-		public Long map(Tuple2<Long, Long> tuple) throws Exception {
-			return tuple.f0 + tuple.f1;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ProjectIdWithTrue implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Boolean>> {
-        public Tuple2<Long, Boolean> map(Vertex<Long, Long> vertex) throws Exception {
-            return new Tuple2<Long, Boolean>(vertex.getId(), true);
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> {
-        public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
-            if(tuple.f1) {
-                return tuple.f0 * 2;
-            }
-            else {
-                return tuple.f0;
-            }
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class ProjectSecondMapper implements MapFunction<Tuple2<Long, Long>, Long> {
-        public Long map(Tuple2<Long, Long> tuple) throws Exception {
-            return tuple.f1;
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
-        public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception {
-            return (long) tuple.f1.getIntField();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java
deleted file mode 100755
index dfb0f3f..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.LabelPropagationExample;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class LabelPropagationExampleITCase extends MultipleProgramsTestBase {
-
-	public LabelPropagationExampleITCase(ExecutionMode mode){
-		super(mode);
-	}
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Test
-	public void testSingleIteration() throws Exception {
-		/*
-		 * Test one iteration of label propagation example with a simple graph
-		 */
-
-		final String vertices = "1 10\n" +
-				"2 10\n" +
-				"3 30\n" +
-				"4 40\n" +
-				"5 40\n" +
-				"6 40\n" +
-				"7 70\n";
-
-		final String edges = "1 3\n" +
-				"2 3\n" +
-				"4 7\n" +
-				"5 7\n" +
-				"6 7\n" +
-				"7 3\n";
-
-		String verticesPath = createTempFile(vertices);
-		String edgesPath = createTempFile(edges);
-
-		LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "7", "1"});
-
-		expectedResult = "1,10\n" +
-			"2,10\n" +
-			"3,10\n" +
-			"4,40\n" +
-			"5,40\n" +
-			"6,40\n" +
-			"7,40\n";
-	}
-
-	@Test
-	public void testTieBreaker() throws Exception {
-		/*
-		 * Test the label propagation example where a tie must be broken
-		 */
-
-		final String vertices = "1 10\n" +
-				"2 10\n" +
-				"3 10\n" +
-				"4 10\n" +
-				"5 0\n" +
-				"6 20\n" +
-				"7 20\n" +
-				"8 20\n" +
-				"9 20\n";
-
-		final String edges = "1 5\n" +
-				"2 5\n" +
-				"3 5\n" +
-				"4 5\n" +
-				"6 5\n" +
-				"7 5\n" +
-				"8 5\n" +
-				"9 5\n";
-
-		String verticesPath = createTempFile(vertices);
-		String edgesPath = createTempFile(edges);
-
-		LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "9", "1"});
-
-		expectedResult = "1,10\n" +
-				"2,10\n" +
-				"3,10\n" +
-				"4,10\n" +
-				"5,20\n" +
-				"6,20\n" +
-				"7,20\n" +
-				"8,20\n" +
-				"9,20\n";
-	}
-
-	@Test
-	public void testTermination() throws Exception {
-		/*
-		 * Test the label propagation example where the algorithm terminates on the first iteration
-		 */
-
-		final String vertices = "1 10\n" +
-				"2 10\n" +
-				"3 10\n" +
-				"4 40\n" +
-				"5 40\n" +
-				"6 40\n";
-
-		final String edges = "1 2\n" +
-				"2 3\n" +
-				"3 1\n" +
-				"4 5\n" +
-				"5 6\n" +
-				"6 4\n";
-
-		String verticesPath = createTempFile(vertices);
-		String edgesPath = createTempFile(edges);
-
-		LabelPropagationExample.main(new String[]{verticesPath, edgesPath, resultPath, "6", "2"});
-
-		expectedResult = "1,10\n" +
-				"2,10\n" +
-				"3,10\n" +
-				"4,40\n" +
-				"5,40\n" +
-				"6,40\n";
-	}
-
-	// -------------------------------------------------------------------------
-	//  Util methods
-	// -------------------------------------------------------------------------
-
-	private String createTempFile(final String rows) throws Exception {
-		File tempFile = tempFolder.newFile();
-		Files.write(rows, tempFile, Charsets.UTF_8);
-		return tempFile.toURI().toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java
deleted file mode 100644
index f7a585d..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class MapEdgesITCase extends MultipleProgramsTestBase {
-
-	public MapEdgesITCase(MultipleProgramsTestBase.ExecutionMode mode){
-		super(mode);
-	}
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Test
-	public void testWithSameValue() throws Exception {
-		/*
-		 * Test mapEdges() keeping the same value type
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Edge<Long, Long>> mappedEdges = graph.mapEdges(new AddOneMapper()).getEdges();
-		
-		mappedEdges.writeAsCsv(resultPath);
-		env.execute();
-		expectedResult = "1,2,13\n" +
-				"1,3,14\n" +
-				"2,3,24\n" +
-				"3,4,35\n" +
-				"3,5,36\n" + 
-				"4,5,46\n" + 
-				"5,1,52\n";
-	}
-
-	@Test
-	public void testWithStringValue() throws Exception {
-		/*
-		 * Test mapEdges() and change the value type to String
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Edge<Long, String>> mappedEdges = graph.mapEdges(new ToStringMapper()).getEdges();
-		
-		mappedEdges.writeAsCsv(resultPath);
-		env.execute();
-		expectedResult = "1,2,string(12)\n" +
-				"1,3,string(13)\n" +
-				"2,3,string(23)\n" +
-				"3,4,string(34)\n" +
-				"3,5,string(35)\n" + 
-				"4,5,string(45)\n" + 
-				"5,1,string(51)\n";
-	}
-
-	@Test
-	public void testWithTuple1Type() throws Exception {
-		/*
-		 * Test mapEdges() and change the value type to a Tuple1
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = graph.mapEdges(new ToTuple1Mapper()).getEdges();
-		
-		mappedEdges.writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,2,(12)\n" +
-				"1,3,(13)\n" +
-				"2,3,(23)\n" +
-				"3,4,(34)\n" +
-				"3,5,(35)\n" + 
-				"4,5,(45)\n" + 
-				"5,1,(51)\n";
-	}
-
-	@Test
-	public void testWithCustomType() throws Exception {
-		/*
-		 * Test mapEdges() and change the value type to a custom type
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Edge<Long, DummyCustomType>> mappedEdges = graph.mapEdges(new ToCustomTypeMapper()).getEdges();
-		
-		mappedEdges.writeAsCsv(resultPath);
-		env.execute();
-
-		expectedResult = "1,2,(T,12)\n" +
-			"1,3,(T,13)\n" +
-			"2,3,(T,23)\n" +
-			"3,4,(T,34)\n" +
-			"3,5,(T,35)\n" + 
-			"4,5,(T,45)\n" + 
-			"5,1,(T,51)\n";
-	}
-
-	@Test
-	public void testWithParametrizedCustomType() throws Exception {
-		/*
-		 * Test mapEdges() and change the value type to a parameterized custom type
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Edge<Long, DummyCustomParameterizedType<Double>>> mappedEdges = graph.mapEdges(
-				new ToCustomParametrizedTypeMapper()).getEdges();
-		
-		mappedEdges.writeAsCsv(resultPath);
-		env.execute();
-	
-		expectedResult = "1,2,(12.0,12)\n" +
-			"1,3,(13.0,13)\n" +
-			"2,3,(23.0,23)\n" +
-			"3,4,(34.0,34)\n" +
-			"3,5,(35.0,35)\n" + 
-			"4,5,(45.0,45)\n" + 
-			"5,1,(51.0,51)\n";
-	}
-
-	@SuppressWarnings("serial")
-	private static final class AddOneMapper implements MapFunction<Edge<Long, Long>, Long> {
-		public Long map(Edge<Long, Long> edge) throws Exception {
-			return edge.getValue()+1;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ToStringMapper implements MapFunction<Edge<Long, Long>, String> {
-		public String map(Edge<Long, Long> edge) throws Exception {
-			return String.format("string(%d)", edge.getValue());
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ToTuple1Mapper implements MapFunction<Edge<Long, Long>, Tuple1<Long>> {
-		public Tuple1<Long> map(Edge<Long, Long> edge) throws Exception {
-			Tuple1<Long> tupleValue = new Tuple1<Long>();
-			tupleValue.setFields(edge.getValue());
-			return tupleValue;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ToCustomTypeMapper implements MapFunction<Edge<Long, Long>, DummyCustomType> {
-		public DummyCustomType map(Edge<Long, Long> edge) throws Exception {
-			DummyCustomType dummyValue = new DummyCustomType();
-			dummyValue.setIntField(edge.getValue().intValue());						
-			return dummyValue;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ToCustomParametrizedTypeMapper implements MapFunction<Edge<Long, Long>, 
-		DummyCustomParameterizedType<Double>> {
-
-		public DummyCustomParameterizedType<Double> map(Edge<Long, Long> edge) throws Exception {
-			DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>();
-			dummyValue.setIntField(edge.getValue().intValue());
-			dummyValue.setTField(new Double(edge.getValue()));						
-			return dummyValue;
-		}
-	}
-}
\ No newline at end of file