You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:41 UTC

[03/24] flink git commit: [FLINK-2833] [gelly] create a flink-libraries module and move gelly there

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
deleted file mode 100644
index ffc9da9..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.operations;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Triplet;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class GraphOperationsITCase extends MultipleProgramsTestBase {
-
-	public GraphOperationsITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-    private String expectedResult;
-
-	@Test
-	public void testUndirected() throws Exception {
-		/*
-		 * Test getUndirected()
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-        DataSet<Edge<Long,Long>> data = graph.getUndirected().getEdges();
-        List<Edge<Long, Long>> result= data.collect();
-        
-		expectedResult = "1,2,12\n" + "2,1,12\n" +
-					"1,3,13\n" + "3,1,13\n" +
-					"2,3,23\n" + "3,2,23\n" +
-					"3,4,34\n" + "4,3,34\n" +
-					"3,5,35\n" + "5,3,35\n" +
-					"4,5,45\n" + "5,4,45\n" +
-					"5,1,51\n" + "1,5,51\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testReverse() throws Exception {
-		/*
-		 * Test reverse()
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-        DataSet<Edge<Long,Long>> data = graph.reverse().getEdges();
-        List<Edge<Long, Long>> result= data.collect();
-        
-		expectedResult = "2,1,12\n" +
-					"3,1,13\n" +
-					"3,2,23\n" +
-					"4,3,34\n" +
-					"5,3,35\n" +
-					"5,4,45\n" +
-					"1,5,51\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@SuppressWarnings("serial")
-	@Test
-	public void testSubGraph() throws Exception {
-		/*
-		 * Test subgraph:
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Edge<Long,Long>> data= graph.subgraph(new FilterFunction<Vertex<Long, Long>>() {
-						   public boolean filter(Vertex<Long, Long> vertex) throws Exception {
-							   return (vertex.getValue() > 2);
-						   }
-					   },
-				new FilterFunction<Edge<Long, Long>>() {
-					public boolean filter(Edge<Long, Long> edge) throws Exception {
-						return (edge.getValue() > 34);
-					}
-				}).getEdges();
-
-        List<Edge<Long, Long>> result= data.collect();
-        
-		expectedResult = "3,5,35\n" +
-					"4,5,45\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@SuppressWarnings("serial")
-	@Test
-	public void testFilterVertices() throws Exception {
-		/*
-		 * Test filterOnVertices:
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Edge<Long,Long>> data = graph.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() {
-			public boolean filter(Vertex<Long, Long> vertex) throws Exception {
-				return (vertex.getValue() > 2);
-			}
-		}).getEdges();
-
-        List<Edge<Long, Long>> result= data.collect();
-		
-		expectedResult =  "3,4,34\n" +
-				"3,5,35\n" +
-				"4,5,45\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@SuppressWarnings("serial")
-	@Test
-	public void testFilterEdges() throws Exception {
-		/*
-		 * Test filterOnEdges:
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Edge<Long,Long>> data = graph.filterOnEdges(new FilterFunction<Edge<Long, Long>>() {
-			public boolean filter(Edge<Long, Long> edge) throws Exception {
-				return (edge.getValue() > 34);
-			}
-		}).getEdges();
-
-        List<Edge<Long, Long>> result = data.collect();
-        
-		expectedResult = "3,5,35\n" +
-					"4,5,45\n" +
-					"5,1,51\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testNumberOfVertices() throws Exception {
-		/*
-		 * Test numberOfVertices()
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		DataSet<Long> data = env.fromElements(graph.numberOfVertices());
-
-        List<Long> result= data.collect();
-        
-		expectedResult = "5";
-		
-		compareResultAsText(result, expectedResult);
-	}
-
-	@Test
-	public void testNumberOfEdges() throws Exception {
-		/*
-		 * Test numberOfEdges()
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		DataSet<Long> data = env.fromElements(graph.numberOfEdges());
-
-        List<Long> result= data.collect();
-        
-		expectedResult = "7";
-		
-		compareResultAsText(result, expectedResult);
-	}
-
-	@Test
-	public void testVertexIds() throws Exception {
-		/*
-		 * Test getVertexIds()
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Long> data = graph.getVertexIds();
-        List<Long> result= data.collect();
-        
-		expectedResult = "1\n2\n3\n4\n5\n";
-		
-		compareResultAsText(result, expectedResult);
-	}
-
-	@Test
-	public void testEdgesIds() throws Exception {
-		/*
-		 * Test getEdgeIds()
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Tuple2<Long,Long>> data = graph.getEdgeIds();
-        List<Tuple2<Long, Long>> result= data.collect();
-        
-		expectedResult = "1,2\n" + "1,3\n" +
-				"2,3\n" + "3,4\n" +
-				"3,5\n" + "4,5\n" +
-				"5,1\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testUnion() throws Exception {
-		/*
-		 * Test union()
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
-		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
-
-		vertices.add(new Vertex<Long, Long>(6L, 6L));
-		edges.add(new Edge<Long, Long>(6L, 1L, 61L));
-
-		graph = graph.union(Graph.fromCollection(vertices, edges, env));
-
-        DataSet<Edge<Long,Long>> data = graph.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
-
-		expectedResult = "1,2,12\n" +
-					"1,3,13\n" +
-					"2,3,23\n" +
-					"3,4,34\n" +
-					"3,5,35\n" +
-					"4,5,45\n" +
-					"5,1,51\n" +
-					"6,1,61\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testDifference() throws Exception {
-		/*Test  difference() method  by checking    the output  for getEdges()   on  the resultant   graph
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		Graph<Long, Long, Long> graph2 = Graph.fromDataSet(TestGraphUtils.getLongLongVertexDataDifference(env),
-				TestGraphUtils.getLongLongEdgeDataDifference(env), env);
-
-		graph = graph.difference(graph2);
-
-		List<Edge<Long, Long>> result = graph.getEdges().collect();
-
-		expectedResult = "4,5,45\n";
-		compareResultAsTuples(result, expectedResult);
-	}
-
-
-	@Test
-	public void testDifferenceVertices() throws Exception{
-		/*Test  difference() method  by checking    the output  for getVertices()   on  the resultant   graph
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		Graph<Long, Long, Long> graph2 = Graph.fromDataSet(TestGraphUtils.getLongLongVertexDataDifference(env),
-				TestGraphUtils.getLongLongEdgeDataDifference(env), env);
-
-		graph = graph.difference(graph2);
-
-		List<Vertex<Long, Long>> result = graph.getVertices().collect();
-
-		expectedResult =  "2,2\n" +
-				"4,4\n" +
-				"5,5\n" ;
-
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testDifference2() throws Exception {
-		/*
-		 * Test difference() such that no common vertices are there
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Vertex<Long, Long>> vertex = env.fromElements(new Vertex<Long, Long>(6L, 6L));
-
-		Graph<Long, Long, Long> graph2 = Graph.fromDataSet(vertex,TestGraphUtils.getLongLongEdgeDataDifference2(env),env);
-
-		graph = graph.difference(graph2);
-
-		List<Edge<Long, Long>> result = graph.getEdges().collect();
-
-		expectedResult =	"1,2,12\n" +
-				"1,3,13\n" +
-				"2,3,23\n" +
-				"3,4,34\n" +
-				"3,5,35\n" +
-				"4,5,45\n" +
-				"5,1,51\n" ;
-
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testTriplets() throws Exception {
-		/*
-		 * Test getTriplets()
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-        DataSet<Triplet<Long,Long,Long>> data = graph.getTriplets();
-        List<Triplet<Long,Long,Long>> result= data.collect();
-
-		expectedResult = "1,2,1,2,12\n" + "1,3,1,3,13\n" +
-				"2,3,2,3,23\n" + "3,4,3,4,34\n" +
-				"3,5,3,5,35\n" + "4,5,4,5,45\n" +
-				"5,1,5,1,51\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
deleted file mode 100644
index e406ce2..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
+++ /dev/null
@@ -1,532 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.operations;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.utils.EdgeToTuple3Map;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
-
-	public JoinWithEdgesITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-    private String expectedResult;
-
-	@Test
-	public void testWithEdgesInputDataset() throws Exception {
-		/*
-		 * Test joinWithEdges with the input DataSet parameter identical
-		 * to the edge DataSet
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithEdges(graph.getEdges()
-                        .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper());
-
-        DataSet<Edge<Long,Long>> data = res.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,26\n" +
-	                "2,3,46\n" +
-	                "3,4,68\n" +
-	                "3,5,70\n" +
-	                "4,5,90\n" +
-	                "5,1,102\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@Test
-	public void testWithLessElements() throws Exception {
-	    /*
-		 * Test joinWithEdges with the input DataSet passed as a parameter containing
-		 * less elements than the edge DataSet, but of the same type
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithEdges(graph.getEdges().first(3)
-                        .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper());
-
-        DataSet<Edge<Long,Long>> data = res.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,26\n" +
-	                "2,3,46\n" +
-	                "3,4,34\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@Test
-	public void testWithLessElementsDifferentType() throws Exception {
-	    /*
-		 * Test joinWithEdges with the input DataSet passed as a parameter containing
-		 * less elements than the edge DataSet and of a different type(Boolean)
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithEdges(graph.getEdges().first(3)
-                        .map(new BooleanEdgeValueMapper()), new DoubleIfTrueMapper());
-
-        DataSet<Edge<Long,Long>> data = res.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,26\n" +
-	                "2,3,46\n" +
-	                "3,4,34\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@Test
-	public void testWithNoCommonKeys() throws Exception {
-	    /*
-		 * Test joinWithEdges with the input DataSet containing different keys than the edge DataSet
-		 * - the iterator becomes empty.
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env),
-                new DoubleValueMapper());
-
-        DataSet<Edge<Long,Long>> data = res.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,26\n" +
-	                "2,3,46\n" +
-	                "3,4,68\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@Test
-	public void testWithCustomType() throws Exception {
-	    /*
-	     * Test joinWithEdges with a DataSet containing custom parametrised type input values
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env),
-                new CustomValueMapper());
-
-        DataSet<Edge<Long,Long>> data = res.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
-
-        expectedResult = "1,2,10\n" +
-	                "1,3,20\n" +
-	                "2,3,30\n" +
-	                "3,4,40\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@Test
-	public void testWithEdgesOnSource() throws Exception {
-	    /*
-		 * Test joinWithEdgesOnSource with the input DataSet parameter identical
-		 * to the edge DataSet
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithEdgesOnSource(graph.getEdges()
-                        .map(new ProjectSourceAndValueMapper()), new AddValuesMapper());
-
-        DataSet<Edge<Long,Long>> data = res.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,25\n" +
-	                "2,3,46\n" +
-	                "3,4,68\n" +
-	                "3,5,69\n" +
-	                "4,5,90\n" +
-	                "5,1,102\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@Test
-	public void testOnSourceWithLessElements() throws Exception {
-	    /*
-		 * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing
-		 * less elements than the edge DataSet, but of the same type
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithEdgesOnSource(graph.getEdges().first(3)
-                        .map(new ProjectSourceAndValueMapper()), new AddValuesMapper());
-
-        DataSet<Edge<Long,Long>> data = res.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,25\n" +
-	                "2,3,46\n" +
-	                "3,4,34\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@Test
-	public void testOnSourceWithDifferentType() throws Exception {
-	    /*
-		 * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing
-		 * less elements than the edge DataSet and of a different type(Boolean)
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithEdgesOnSource(graph.getEdges().first(3)
-                        .map(new ProjectSourceWithTrueMapper()), new DoubleIfTrueMapper());
-
-        DataSet<Edge<Long,Long>> data = res.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,26\n" +
-	                "2,3,46\n" +
-	                "3,4,34\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@Test
-	public void testOnSourceWithNoCommonKeys() throws Exception {
-	    /*
-		 * Test joinWithEdgesOnSource with the input DataSet containing different keys than the edge DataSet
-		 * - the iterator becomes empty.
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env),
-                new DoubleValueMapper());
-
-        DataSet<Edge<Long,Long>> data = res.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
-
-        expectedResult = "1,2,20\n" +
-	                "1,3,20\n" +
-	                "2,3,60\n" +
-	                "3,4,80\n" +
-	                "3,5,80\n" +
-	                "4,5,120\n" +
-	                "5,1,51\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@Test
-	public void testOnSourceWithCustom() throws Exception {
-	    /*
-	     * Test joinWithEdgesOnSource with a DataSet containing custom parametrised type input values
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env),
-                new CustomValueMapper());
-
-        DataSet<Edge<Long,Long>> data = res.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
-
-        expectedResult = "1,2,10\n" +
-	                "1,3,10\n" +
-	                "2,3,30\n" +
-	                "3,4,40\n" +
-	                "3,5,40\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@Test
-	public void testWithEdgesOnTarget() throws Exception {
-    /*
-	 * Test joinWithEdgesOnTarget with the input DataSet parameter identical
-	 * to the edge DataSet
-	 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithEdgesOnTarget(graph.getEdges()
-                        .map(new ProjectTargetAndValueMapper()), new AddValuesMapper());
-
-        DataSet<Edge<Long,Long>> data = res.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,26\n" +
-	                "2,3,36\n" +
-	                "3,4,68\n" +
-	                "3,5,70\n" +
-	                "4,5,80\n" +
-	                "5,1,102\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@Test
-	public void testWithOnTargetWithLessElements() throws Exception {
-	    /*
-		 * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing
-		 * less elements than the edge DataSet, but of the same type
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
-                        .map(new ProjectTargetAndValueMapper()), new AddValuesMapper());
-
-        DataSet<Edge<Long,Long>> data = res.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,26\n" +
-	                "2,3,36\n" +
-	                "3,4,34\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@Test
-	public void testOnTargetWithDifferentType() throws Exception {
-	    /*
-		 * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing
-		 * less elements than the edge DataSet and of a different type(Boolean)
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
-                        .map(new ProjectTargetWithTrueMapper()), new DoubleIfTrueMapper());
-
-        DataSet<Edge<Long,Long>> data = res.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
-
-        expectedResult = "1,2,24\n" +
-	                "1,3,26\n" +
-	                "2,3,46\n" +
-	                "3,4,34\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@Test
-	public void testOnTargetWithNoCommonKeys() throws Exception {
-	    /*
-		 * Test joinWithEdgesOnTarget with the input DataSet containing different keys than the edge DataSet
-		 * - the iterator becomes empty.
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env),
-                new DoubleValueMapper());
-
-        DataSet<Edge<Long,Long>> data = res.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
-
-        expectedResult = "1,2,20\n" +
-	                "1,3,40\n" +
-	                "2,3,40\n" +
-	                "3,4,80\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,140\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@Test
-	public void testOnTargetWithCustom() throws Exception {
-	    /*
-	     * Test joinWithEdgesOnTarget with a DataSet containing custom parametrised type input values
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env),
-                new CustomValueMapper());
-
-        DataSet<Edge<Long,Long>> data = res.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
-
-        expectedResult = "1,2,10\n" +
-	                "1,3,20\n" +
-	                "2,3,20\n" +
-	                "3,4,40\n" +
-	                "3,5,35\n" +
-	                "4,5,45\n" +
-	                "5,1,51\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@SuppressWarnings("serial")
-	private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> {
-		public Long map(Tuple2<Long, Long> tuple) throws Exception {
-			return tuple.f0 + tuple.f1;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class BooleanEdgeValueMapper implements MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Boolean>> {
-        public Tuple3<Long, Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
-            return new Tuple3<Long, Long, Boolean>(edge.getSource(),
-                    edge.getTarget(), true);
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> {
-        public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
-            if(tuple.f1) {
-                return tuple.f0 * 2;
-            }
-            else {
-                return tuple.f0;
-            }
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class DoubleValueMapper implements MapFunction<Tuple2<Long, Long>, Long> {
-        public Long map(Tuple2<Long, Long> tuple) throws Exception {
-            return tuple.f1 * 2;
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
-        public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception {
-            return (long) tuple.f1.getIntField();
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class ProjectSourceAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
-        public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
-            return new Tuple2<Long, Long>(edge.getSource(), edge.getValue());
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class ProjectSourceWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
-        public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
-            return new Tuple2<Long, Boolean>(edge.getSource(), true);
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class ProjectTargetAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
-        public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
-            return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue());
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class ProjectTargetWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
-        public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
-            return new Tuple2<Long, Boolean>(edge.getTarget(), true);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
deleted file mode 100644
index 22a5535..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.operations;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.utils.VertexToTuple2Map;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class JoinWithVerticesITCase extends MultipleProgramsTestBase {
-
-	public JoinWithVerticesITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-    private String expectedResult;
-
-	@Test
-	public void testJoinWithVertexSet() throws Exception {
-		/*
-		 * Test joinWithVertices with the input DataSet parameter identical
-		 * to the vertex DataSet
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithVertices(graph.getVertices()
-                        .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper());
-
-		DataSet<Vertex<Long,Long>> data = res.getVertices();
-        List<Vertex<Long,Long>> result= data.collect();
-
-       expectedResult = "1,2\n" +
-	                "2,4\n" +
-	                "3,6\n" +
-	                "4,8\n" +
-	                "5,10\n";
-       
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@Test
-	public void testWithLessElements() throws Exception {
-	/*
-	 * Test joinWithVertices with the input DataSet passed as a parameter containing
-	 * less elements than the vertex DataSet, but of the same type
-	 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithVertices(graph.getVertices().first(3)
-                        .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper());
-
-		DataSet<Vertex<Long,Long>> data = res.getVertices();
-        List<Vertex<Long,Long>> result= data.collect();
-
-        expectedResult = "1,2\n" +
-	                "2,4\n" +
-	                "3,6\n" +
-	                "4,4\n" +
-	                "5,5\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@Test
-	public void testWithDifferentType() throws Exception {
-	/*
-	 * Test joinWithVertices with the input DataSet passed as a parameter containing
-	 * less elements than the vertex DataSet and of a different type(Boolean)
-	 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithVertices(graph.getVertices().first(3)
-                        .map(new ProjectIdWithTrue()), new DoubleIfTrueMapper());
-
-		DataSet<Vertex<Long,Long>> data = res.getVertices();
-        List<Vertex<Long,Long>> result= data.collect();
-
-        expectedResult = "1,2\n" +
-	                "2,4\n" +
-	                "3,6\n" +
-	                "4,4\n" +
-	                "5,5\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@Test
-	public void testWithDifferentKeys() throws Exception {
-		/*
-		 * Test joinWithVertices with an input DataSet containing different keys than the vertex DataSet
-		 * - the iterator becomes empty.
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env),
-                new ProjectSecondMapper());
-
-		DataSet<Vertex<Long,Long>> data = res.getVertices();
-        List<Vertex<Long,Long>> result= data.collect();
-
-        expectedResult = "1,10\n" +
-	                "2,20\n" +
-	                "3,30\n" +
-	                "4,40\n" +
-	                "5,5\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@Test
-	public void testWithCustomType() throws Exception {
-		/*
-		 * Test joinWithVertices with a DataSet containing custom parametrised type input values
-		 */
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-                TestGraphUtils.getLongLongEdgeData(env), env);
-
-        Graph<Long, Long, Long> res = graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env),
-                new CustomValueMapper());
-
-		DataSet<Vertex<Long,Long>> data = res.getVertices();
-        List<Vertex<Long,Long>> result= data.collect();
-
-        expectedResult = "1,10\n" +
-	                "2,20\n" +
-	                "3,30\n" +
-	                "4,40\n" +
-	                "5,5\n";
-        
-		compareResultAsTuples(result, expectedResult);
-    }
-
-	@SuppressWarnings("serial")
-	private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> {
-		public Long map(Tuple2<Long, Long> tuple) throws Exception {
-			return tuple.f0 + tuple.f1;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ProjectIdWithTrue implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Boolean>> {
-        public Tuple2<Long, Boolean> map(Vertex<Long, Long> vertex) throws Exception {
-            return new Tuple2<Long, Boolean>(vertex.getId(), true);
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> {
-        public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
-            if(tuple.f1) {
-                return tuple.f0 * 2;
-            }
-            else {
-                return tuple.f0;
-            }
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class ProjectSecondMapper implements MapFunction<Tuple2<Long, Long>, Long> {
-        public Long map(Tuple2<Long, Long> tuple) throws Exception {
-            return tuple.f1;
-        }
-    }
-
-	@SuppressWarnings("serial")
-	private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
-        public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception {
-            return (long) tuple.f1.getIntField();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java
deleted file mode 100644
index 35f7b0e..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.operations;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class MapEdgesITCase extends MultipleProgramsTestBase {
-
-	public MapEdgesITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-    private String expectedResult;
-
-	@Test
-	public void testWithSameValue() throws Exception {
-		/*
-		 * Test mapEdges() keeping the same value type
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Edge<Long, Long>> mappedEdges = graph.mapEdges(new AddOneMapper()).getEdges();
-        List<Edge<Long, Long>> result= mappedEdges.collect();
-        
-		expectedResult = "1,2,13\n" +
-				"1,3,14\n" +
-				"2,3,24\n" +
-				"3,4,35\n" +
-				"3,5,36\n" + 
-				"4,5,46\n" + 
-				"5,1,52\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testWithStringValue() throws Exception {
-		/*
-		 * Test mapEdges() and change the value type to String
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Edge<Long, String>> mappedEdges = graph.mapEdges(new ToStringMapper()).getEdges();
-		List<Edge<Long, String>> result= mappedEdges.collect();
-		
-		expectedResult = "1,2,string(12)\n" +
-				"1,3,string(13)\n" +
-				"2,3,string(23)\n" +
-				"3,4,string(34)\n" +
-				"3,5,string(35)\n" + 
-				"4,5,string(45)\n" + 
-				"5,1,string(51)\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testWithTuple1Type() throws Exception {
-		/*
-		 * Test mapEdges() and change the value type to a Tuple1
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = graph.mapEdges(new ToTuple1Mapper()).getEdges();
-		List<Edge<Long, Tuple1<Long>>> result= mappedEdges.collect();
-
-		expectedResult = "1,2,(12)\n" +
-				"1,3,(13)\n" +
-				"2,3,(23)\n" +
-				"3,4,(34)\n" +
-				"3,5,(35)\n" + 
-				"4,5,(45)\n" + 
-				"5,1,(51)\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testWithCustomType() throws Exception {
-		/*
-		 * Test mapEdges() and change the value type to a custom type
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Edge<Long, DummyCustomType>> mappedEdges = graph.mapEdges(new ToCustomTypeMapper()).getEdges();
-		List<Edge<Long, DummyCustomType>> result= mappedEdges.collect();
-
-		expectedResult = "1,2,(T,12)\n" +
-			"1,3,(T,13)\n" +
-			"2,3,(T,23)\n" +
-			"3,4,(T,34)\n" +
-			"3,5,(T,35)\n" + 
-			"4,5,(T,45)\n" + 
-			"5,1,(T,51)\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testWithParametrizedCustomType() throws Exception {
-		/*
-		 * Test mapEdges() and change the value type to a parameterized custom type
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Edge<Long, DummyCustomParameterizedType<Double>>> mappedEdges = graph.mapEdges(
-				new ToCustomParametrizedTypeMapper()).getEdges();
-		List<Edge<Long, DummyCustomParameterizedType<Double>>> result= mappedEdges.collect();
-	
-		expectedResult = "1,2,(12.0,12)\n" +
-			"1,3,(13.0,13)\n" +
-			"2,3,(23.0,23)\n" +
-			"3,4,(34.0,34)\n" +
-			"3,5,(35.0,35)\n" + 
-			"4,5,(45.0,45)\n" + 
-			"5,1,(51.0,51)\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@SuppressWarnings("serial")
-	private static final class AddOneMapper implements MapFunction<Edge<Long, Long>, Long> {
-		public Long map(Edge<Long, Long> edge) throws Exception {
-			return edge.getValue()+1;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ToStringMapper implements MapFunction<Edge<Long, Long>, String> {
-		public String map(Edge<Long, Long> edge) throws Exception {
-			return String.format("string(%d)", edge.getValue());
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ToTuple1Mapper implements MapFunction<Edge<Long, Long>, Tuple1<Long>> {
-		public Tuple1<Long> map(Edge<Long, Long> edge) throws Exception {
-			Tuple1<Long> tupleValue = new Tuple1<Long>();
-			tupleValue.setFields(edge.getValue());
-			return tupleValue;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ToCustomTypeMapper implements MapFunction<Edge<Long, Long>, DummyCustomType> {
-		public DummyCustomType map(Edge<Long, Long> edge) throws Exception {
-			DummyCustomType dummyValue = new DummyCustomType();
-			dummyValue.setIntField(edge.getValue().intValue());						
-			return dummyValue;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ToCustomParametrizedTypeMapper implements MapFunction<Edge<Long, Long>, 
-		DummyCustomParameterizedType<Double>> {
-
-		public DummyCustomParameterizedType<Double> map(Edge<Long, Long> edge) throws Exception {
-			DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>();
-			dummyValue.setIntField(edge.getValue().intValue());
-			dummyValue.setTField(new Double(edge.getValue()));						
-			return dummyValue;
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java
deleted file mode 100644
index 677a03c..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.operations;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
-import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class MapVerticesITCase extends MultipleProgramsTestBase {
-
-	public MapVerticesITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-    private String expectedResult;
-
-	@Test
-	public void testWithSameValue() throws Exception {
-		/*
-		 * Test mapVertices() keeping the same value type
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new AddOneMapper()).getVertices();	
-        List<Vertex<Long, Long>> result= mappedVertices.collect();
-        
-		expectedResult = "1,2\n" +
-			"2,3\n" +
-			"3,4\n" +
-			"4,5\n" +
-			"5,6\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testWithStringValue() throws Exception {
-		/*
-		 * Test mapVertices() and change the value type to String
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Vertex<Long, String>> mappedVertices = graph.mapVertices(new ToStringMapper()).getVertices();
-        List<Vertex<Long, String>> result= mappedVertices.collect();
-
-		expectedResult = "1,one\n" +
-			"2,two\n" +
-			"3,three\n" +
-			"4,four\n" +
-			"5,five\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testWithtuple1Value() throws Exception {
-		/*
-		 * Test mapVertices() and change the value type to a Tuple1
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new ToTuple1Mapper()).getVertices();
-        List<Vertex<Long, Tuple1<Long>>> result= mappedVertices.collect();
-
-		expectedResult = "1,(1)\n" +
-			"2,(2)\n" +
-			"3,(3)\n" +
-			"4,(4)\n" +
-			"5,(5)\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testWithCustomType() throws Exception {
-		/*
-		 * Test mapVertices() and change the value type to a custom type
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Vertex<Long, DummyCustomType>> mappedVertices = graph.mapVertices(new ToCustomTypeMapper()).getVertices();
-        List<Vertex<Long, DummyCustomType>> result= mappedVertices.collect();
-
-		expectedResult = "1,(T,1)\n" +
-			"2,(T,2)\n" +
-			"3,(T,3)\n" +
-			"4,(T,4)\n" +
-			"5,(T,5)\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testWithCustomParametrizedType() throws Exception {
-		/*
-		 * Test mapVertices() and change the value type to a parameterized custom type
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-		
-		DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> mappedVertices = graph.mapVertices(
-				new ToCustomParametrizedTypeMapper()).getVertices();
-        List<Vertex<Long, DummyCustomParameterizedType<Double>>> result= mappedVertices.collect();
-	
-		expectedResult = "1,(1.0,1)\n" +
-			"2,(2.0,2)\n" +
-			"3,(3.0,3)\n" +
-			"4,(4.0,4)\n" +
-			"5,(5.0,5)\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@SuppressWarnings("serial")
-	private static final class AddOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
-		public Long map(Vertex<Long, Long> value) throws Exception {
-			return value.getValue()+1;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ToStringMapper implements MapFunction<Vertex<Long, Long>, String> {
-		public String map(Vertex<Long, Long> vertex) throws Exception {
-			String stringValue;
-			if (vertex.getValue() == 1) {
-				stringValue = "one";
-			}
-			else if (vertex.getValue() == 2) {
-				stringValue = "two";
-			}
-			else if (vertex.getValue() == 3) {
-				stringValue = "three";
-			}
-			else if (vertex.getValue() == 4) {
-				stringValue = "four";
-			}
-			else if (vertex.getValue() == 5) {
-				stringValue = "five";
-			}
-			else {
-				stringValue = "";
-			}
-			return stringValue;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ToTuple1Mapper implements MapFunction<Vertex<Long, Long>, Tuple1<Long>> {
-		public Tuple1<Long> map(Vertex<Long, Long> vertex) throws Exception {
-			Tuple1<Long> tupleValue = new Tuple1<Long>();
-			tupleValue.setFields(vertex.getValue());
-			return tupleValue;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ToCustomTypeMapper implements MapFunction<Vertex<Long, Long>, DummyCustomType> {
-		public DummyCustomType map(Vertex<Long, Long> vertex) throws Exception {
-			DummyCustomType dummyValue = new DummyCustomType();
-			dummyValue.setIntField(vertex.getValue().intValue());						
-			return dummyValue;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ToCustomParametrizedTypeMapper implements MapFunction<Vertex<Long, Long>, 
-		DummyCustomParameterizedType<Double>> {
-		
-		public DummyCustomParameterizedType<Double> map(Vertex<Long, Long> vertex) throws Exception {
-			DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>();
-			dummyValue.setIntField(vertex.getValue().intValue());
-			dummyValue.setTField(new Double(vertex.getValue()));						
-			return dummyValue;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
deleted file mode 100644
index 3bb19fa..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
+++ /dev/null
@@ -1,615 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.operations;
-
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.EdgesFunction;
-import org.apache.flink.graph.EdgesFunctionWithVertexValue;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.ReduceEdgesFunction;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
-
-	public ReduceOnEdgesMethodsITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-    private String expectedResult;
-
-	@Test
-	public void testLowestWeightOutNeighbor() throws Exception {
-		/*
-		 * Get the lowest-weight out-neighbor
-		 * for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-				graph.groupReduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT);
-		List<Tuple2<Long,Long>> result = verticesWithLowestOutNeighbor.collect();
-
-	
-		expectedResult = "1,2\n" +
-				"2,3\n" +
-				"3,4\n" +
-				"4,5\n" +
-				"5,1\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testLowestWeightInNeighbor() throws Exception {
-		/*
-		 * Get the lowest-weight in-neighbor
-		 * for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-				graph.groupReduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN);
-		List<Tuple2<Long,Long>> result = verticesWithLowestOutNeighbor.collect();
-
-		expectedResult = "1,5\n" +
-					"2,1\n" + 
-					"3,1\n" +
-					"4,3\n" + 
-					"5,3\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testAllOutNeighbors() throws Exception {
-		/*
-		 * Get the all the out-neighbors for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithAllOutNeighbors =
-				graph.groupReduceOnEdges(new SelectOutNeighbors(), EdgeDirection.OUT);
-		List<Tuple2<Long,Long>> result = verticesWithAllOutNeighbors.collect();
-
-		expectedResult = "1,2\n" +
-				"1,3\n" +
-				"2,3\n" +
-				"3,4\n" +
-				"3,5\n" +
-				"4,5\n" +
-				"5,1";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testAllOutNeighborsNoValue() throws Exception {
-		/*
-		 * Get the all the out-neighbors for each vertex except for the vertex with id 5.
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithAllOutNeighbors =
-				graph.groupReduceOnEdges(new SelectOutNeighborsExcludeFive(), EdgeDirection.OUT);
-		List<Tuple2<Long,Long>> result = verticesWithAllOutNeighbors.collect();
-
-		expectedResult = "1,2\n" +
-				"1,3\n" +
-				"2,3\n" +
-				"3,4\n" +
-				"3,5\n" +
-				"4,5";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testAllOutNeighborsWithValueGreaterThanTwo() throws Exception {
-		/*
-		 * Get the all the out-neighbors for each vertex that have a value greater than two.
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithAllOutNeighbors =
-				graph.groupReduceOnEdges(new SelectOutNeighborsValueGreaterThanTwo(), EdgeDirection.OUT);
-		List<Tuple2<Long,Long>> result = verticesWithAllOutNeighbors.collect();
-
-		expectedResult = "3,4\n" +
-				"3,5\n" +
-				"4,5\n" +
-				"5,1";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testAllInNeighbors() throws Exception {
-		/*
-		 * Get the all the in-neighbors for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors =
-				graph.groupReduceOnEdges(new SelectInNeighbors(), EdgeDirection.IN);
-		List<Tuple2<Long,Long>> result = verticesWithAllInNeighbors.collect();
-
-		expectedResult = "1,5\n" +
-				"2,1\n" +
-				"3,1\n" +
-				"3,2\n" +
-				"4,3\n" +
-				"5,3\n" +
-				"5,4";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testAllInNeighborsNoValue() throws Exception {
-		/*
-		 * Get the all the in-neighbors for each vertex except for the vertex with id 5.
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors =
-				graph.groupReduceOnEdges(new SelectInNeighborsExceptFive(), EdgeDirection.IN);
-		List<Tuple2<Long,Long>> result = verticesWithAllInNeighbors.collect();
-
-		expectedResult = "1,5\n" +
-				"2,1\n" +
-				"3,1\n" +
-				"3,2\n" +
-				"4,3";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testAllInNeighborsWithValueGreaterThanTwo() throws Exception {
-		/*
-		 * Get the all the in-neighbors for each vertex that have a value greater than two.
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors =
-				graph.groupReduceOnEdges(new SelectInNeighborsValueGreaterThanTwo(), EdgeDirection.IN);
-		List<Tuple2<Long,Long>> result = verticesWithAllInNeighbors.collect();
-
-		expectedResult = "3,1\n" +
-				"3,2\n" +
-				"4,3\n" +
-				"5,3\n" +
-				"5,4";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testAllNeighbors() throws Exception {
-		/*
-		 * Get the all the neighbors for each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
-				graph.groupReduceOnEdges(new SelectNeighbors(), EdgeDirection.ALL);
-		List<Tuple2<Long,Long>> result = verticesWithAllNeighbors.collect();
-
-		expectedResult = "1,2\n" +
-				"1,3\n" +
-				"1,5\n" +
-				"2,1\n" +
-				"2,3\n" +
-				"3,1\n" +
-				"3,2\n" +
-				"3,4\n" +
-				"3,5\n" +
-				"4,3\n" +
-				"4,5\n" +
-				"5,1\n" +
-				"5,3\n" +
-				"5,4";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testAllNeighborsNoValue() throws Exception {
-		/*
-		 * Get the all the neighbors for each vertex except for vertices with id 5 and 2.
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
-				graph.groupReduceOnEdges(new SelectNeighborsExceptFiveAndTwo(), EdgeDirection.ALL);
-		List<Tuple2<Long,Long>> result = verticesWithAllNeighbors.collect();
-
-		expectedResult = "1,2\n" +
-				"1,3\n" +
-				"1,5\n" +
-				"3,1\n" +
-				"3,2\n" +
-				"3,4\n" +
-				"3,5\n" +
-				"4,3\n" +
-				"4,5";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testAllNeighborsWithValueGreaterThanFour() throws Exception {
-		/*
-		 * Get the all the neighbors for each vertex that have a value greater than four.
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
-				graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour(), EdgeDirection.ALL);
-		List<Tuple2<Long,Long>> result = verticesWithAllNeighbors.collect();
-
-		expectedResult = "5,1\n" +
-				"5,3\n" +
-				"5,4";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testMaxWeightEdge() throws Exception {
-		/*
-		 * Get the maximum weight among all edges
-		 * of a vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = 
-				graph.groupReduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL);
-		List<Tuple2<Long,Long>> result = verticesWithMaxEdgeWeight.collect();
-
-		expectedResult = "1,51\n" +
-				"2,23\n" + 
-				"3,35\n" +
-				"4,45\n" + 
-				"5,51\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testLowestWeightOutNeighborNoValue() throws Exception {
-		/*
-		 * Get the lowest-weight out of all the out-neighbors
-		 * of each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-				graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.OUT);
-		List<Tuple2<Long,Long>> result = verticesWithLowestOutNeighbor.collect();
-
-		expectedResult = "1,12\n" +
-				"2,23\n" +
-				"3,34\n" +
-				"4,45\n" +
-				"5,51\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testLowestWeightInNeighborNoValue() throws Exception {
-		/*
-		 * Get the lowest-weight out of all the in-neighbors
-		 * of each vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-				graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.IN);
-		List<Tuple2<Long,Long>> result = verticesWithLowestOutNeighbor.collect();
-
-		expectedResult = "1,51\n" +
-				"2,12\n" +
-				"3,13\n" +
-				"4,34\n" +
-				"5,35\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testMaxWeightAllNeighbors() throws Exception {
-		/*
-		 * Get the maximum weight among all edges
-		 * of a vertex
-         */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
-				TestGraphUtils.getLongLongEdgeData(env), env);
-
-		DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = 
-				graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL);
-		List<Tuple2<Long,Long>> result = verticesWithMaxEdgeWeight.collect();
-
-		expectedResult = "1,51\n" +
-				"2,23\n" + 
-				"3,35\n" +
-				"4,45\n" + 
-				"5,51\n";
-		
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateEdges(Vertex<Long, Long> v,
-				Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
-			
-			long weight = Long.MAX_VALUE;
-			long minNeighborId = 0;
-
-			for (Edge<Long, Long> edge: edges) {
-				if (edge.getValue() < weight) {
-					weight = edge.getValue();
-					minNeighborId = edge.getTarget();
-				}
-			}
-			out.collect(new Tuple2<Long, Long>(v.getId(), minNeighborId));
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectMaxWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateEdges(Vertex<Long, Long> v,
-				Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
-			
-			long weight = Long.MIN_VALUE;
-
-			for (Edge<Long, Long> edge: edges) {
-				if (edge.getValue() > weight) {
-					weight = edge.getValue();
-				}
-			}
-			out.collect(new Tuple2<Long, Long>(v.getId(), weight));
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectMinWeightNeighborNoValue implements ReduceEdgesFunction<Long> {
-
-		@Override
-		public Long reduceEdges(Long firstEdgeValue, Long secondEdgeValue) {
-			return Math.min(firstEdgeValue, secondEdgeValue);
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectMaxWeightNeighborNoValue implements ReduceEdgesFunction<Long> {
-
-		@Override
-		public Long reduceEdges(Long firstEdgeValue, Long secondEdgeValue) {
-			return Math.max(firstEdgeValue, secondEdgeValue);
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectMinWeightInNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateEdges(Vertex<Long, Long> v,
-				Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
-			
-			long weight = Long.MAX_VALUE;
-			long minNeighborId = 0;
-			
-			for (Edge<Long, Long> edge: edges) {
-				if (edge.getValue() < weight) {
-					weight = edge.getValue();
-					minNeighborId = edge.getSource();
-				}
-			}
-			out.collect(new Tuple2<Long, Long>(v.getId(), minNeighborId));
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectOutNeighbors implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
-								 Collector<Tuple2<Long, Long>> out) throws Exception {
-
-			for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
-				out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectOutNeighborsExcludeFive implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
-								 Collector<Tuple2<Long, Long>> out) throws Exception {
-
-			for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
-				if(edge.f0 != 5) {
-					out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
-				}
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectOutNeighborsValueGreaterThanTwo implements
-			EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateEdges(Vertex<Long, Long> v, Iterable<Edge<Long, Long>> edges,
-								 Collector<Tuple2<Long, Long>> out) throws Exception {
-			for (Edge<Long, Long> edge: edges) {
-				if(v.getValue() > 2) {
-					out.collect(new Tuple2<Long, Long>(v.getId(), edge.getTarget()));
-				}
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectInNeighbors implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
-								 Collector<Tuple2<Long, Long>> out) throws Exception {
-
-			for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
-				out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectInNeighborsExceptFive implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
-								 Collector<Tuple2<Long, Long>> out) throws Exception {
-
-			for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
-				if(edge.f0 != 5) {
-					out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
-				}
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectInNeighborsValueGreaterThanTwo implements
-			EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateEdges(Vertex<Long, Long> v, Iterable<Edge<Long, Long>> edges,
-								 Collector<Tuple2<Long, Long>> out) throws Exception {
-			for (Edge<Long, Long> edge: edges) {
-				if(v.getValue() > 2) {
-					out.collect(new Tuple2<Long, Long>(v.getId(), edge.getSource()));
-				}
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectNeighbors implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
-								 Collector<Tuple2<Long, Long>> out) throws Exception {
-			for (Tuple2<Long, Edge<Long, Long>> edge : edges) {
-				if (edge.f0 == edge.f1.getTarget()) {
-					out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
-				} else {
-					out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
-				}
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectNeighborsExceptFiveAndTwo implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
-								 Collector<Tuple2<Long, Long>> out) throws Exception {
-			for (Tuple2<Long, Edge<Long, Long>> edge : edges) {
-				if(edge.f0 != 5 && edge.f0 != 2) {
-					if (edge.f0 == edge.f1.getTarget()) {
-						out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
-					} else {
-						out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
-					}
-				}
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SelectNeighborsValueGreaterThanFour implements
-			EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateEdges(Vertex<Long, Long> v, Iterable<Edge<Long, Long>> edges,
-								 Collector<Tuple2<Long, Long>> out) throws Exception {
-			for(Edge<Long, Long> edge : edges) {
-				if(v.getValue() > 4) {
-					if(v.getId().equals(edge.getTarget())) {
-						out.collect(new Tuple2<Long, Long>(v.getId(), edge.getSource()));
-					} else {
-						out.collect(new Tuple2<Long, Long>(v.getId(), edge.getTarget()));
-					}
-				}
-			}
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
deleted file mode 100644
index ab10947..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.operations;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.EdgesFunctionWithVertexValue;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.util.Collector;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.fail;
-
-public class ReduceOnEdgesWithExceptionITCase {
-
-	private static final int PARALLELISM = 4;
-
-	private static ForkableFlinkMiniCluster cluster;
-
-
-	@BeforeClass
-	public static void setupCluster() {
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-			cluster = new ForkableFlinkMiniCluster(config, false);
-			cluster.start();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Error starting test cluster: " + e.getMessage());
-		}
-	}
-
-	@AfterClass
-	public static void tearDownCluster() {
-		try {
-			cluster.stop();
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("Cluster shutdown caused an exception: " + t.getMessage());
-		}
-	}
-
-	/**
-	 * Test groupReduceOnEdges() with an edge having a srcId that does not exist in the vertex DataSet
-	 */
-	@Test
-	public void testGroupReduceOnEdgesInvalidEdgeSrcId() throws Exception {
-
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
-
-		try {
-			DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
-					graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour(), EdgeDirection.ALL);
-
-			verticesWithAllNeighbors.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
-			env.execute();
-		} catch (Exception e) {
-			// We expect the job to fail with an exception
-		}
-	}
-
-	/**
-	 * Test groupReduceOnEdges() with an edge having a trgId that does not exist in the vertex DataSet
-	 */
-	@Test
-	public void testGroupReduceOnEdgesInvalidEdgeTrgId() throws Exception {
-
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-
-		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
-				TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
-
-		try {
-			DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
-					graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour(), EdgeDirection.ALL);
-
-			verticesWithAllNeighbors.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
-			env.execute();
-		} catch (Exception e) {
-			// We expect the job to fail with an exception
-		}
-	}
-
-
-	@SuppressWarnings("serial")
-	private static final class SelectNeighborsValueGreaterThanFour implements
-			EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
-
-		@Override
-		public void iterateEdges(Vertex<Long, Long> v, Iterable<Edge<Long, Long>> edges,
-								 Collector<Tuple2<Long, Long>> out) throws Exception {
-			for(Edge<Long, Long> edge : edges) {
-				if(v.getValue() > 4) {
-					if(v.getId().equals(edge.getTarget())) {
-						out.collect(new Tuple2<Long, Long>(v.getId(), edge.getSource()));
-					} else {
-						out.collect(new Tuple2<Long, Long>(v.getId(), edge.getTarget()));
-					}
-				}
-			}
-		}
-	}
-}