You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:54 UTC

[16/24] flink git commit: [FLINK-2833] [gelly] create a flink-libraries module and move gelly there

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
new file mode 100644
index 0000000..bb3a131
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.graph.spargel;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.DeltaIterationResultSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.TwoInputUdfOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+
+@SuppressWarnings("serial")
+public class SpargelTranslationTest {
+
+	@Test
+	public void testTranslationPlainEdges() {
+		try {
+			final String ITERATION_NAME = "Test Name";
+			
+			final String AGGREGATOR_NAME = "AggregatorName";
+			
+			final String BC_SET_MESSAGES_NAME = "borat messages";
+			
+			final String BC_SET_UPDATES_NAME = "borat updates";
+			;
+			final int NUM_ITERATIONS = 13;
+			
+			final int ITERATION_parallelism = 77;
+			
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> bcMessaging = env.fromElements(1L);
+			DataSet<Long> bcUpdate = env.fromElements(1L);
+			
+			DataSet<Vertex<String, Double>> result;
+			
+			// ------------ construct the test program ------------------
+			{
+				
+				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
+
+				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
+
+				Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
+						edges.map(new MapFunction<Tuple2<String,String>, Tuple3<String, String, NullValue>>() {
+
+							public Tuple3<String, String, NullValue> map(
+									Tuple2<String, String> edge) {
+								return new Tuple3<String, String, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+							}
+						}), env);
+
+				VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+				parameters.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcMessaging);
+				parameters.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcUpdate);
+				parameters.setName(ITERATION_NAME);
+				parameters.setParallelism(ITERATION_parallelism);
+				parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+
+				result = graph.runVertexCentricIteration(new UpdateFunction(), new MessageFunctionNoEdgeValue(),
+						NUM_ITERATIONS, parameters).getVertices();
+
+				result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
+			}
+			
+			
+			// ------------- validate the java program ----------------
+			
+			assertTrue(result instanceof DeltaIterationResultSet);
+			
+			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+			DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
+			
+			// check the basic iteration properties
+			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
+			assertEquals(ITERATION_parallelism, iteration.getParallelism());
+			assertEquals(ITERATION_NAME, iteration.getName());
+			
+			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+			
+			// validate that the semantic properties are set as they should
+			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
+			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
+			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
+			
+			TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
+			
+			// validate that the broadcast sets are forwarded
+			assertEquals(bcUpdate, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
+			assertEquals(bcMessaging, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTranslationPlainEdgesWithForkedBroadcastVariable() {
+		try {
+			final String ITERATION_NAME = "Test Name";
+			
+			final String AGGREGATOR_NAME = "AggregatorName";
+			
+			final String BC_SET_MESSAGES_NAME = "borat messages";
+			
+			final String BC_SET_UPDATES_NAME = "borat updates";
+			;
+			final int NUM_ITERATIONS = 13;
+			
+			final int ITERATION_parallelism = 77;
+			
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> bcVar = env.fromElements(1L);
+			
+			DataSet<Vertex<String, Double>> result;
+			
+			// ------------ construct the test program ------------------
+			{
+
+				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
+
+				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
+
+				Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
+						edges.map(new MapFunction<Tuple2<String,String>, Tuple3<String, String, NullValue>>() {
+
+							public Tuple3<String, String, NullValue> map(
+									Tuple2<String, String> edge) {
+								return new Tuple3<String, String, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+							}
+						}), env);
+
+				VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+				parameters.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcVar);
+				parameters.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcVar);
+				parameters.setName(ITERATION_NAME);
+				parameters.setParallelism(ITERATION_parallelism);
+				parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+				
+				result = graph.runVertexCentricIteration(new UpdateFunction(), new MessageFunctionNoEdgeValue(),
+						NUM_ITERATIONS, parameters).getVertices();
+
+				result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
+			}
+			
+			
+			// ------------- validate the java program ----------------
+			
+			assertTrue(result instanceof DeltaIterationResultSet);
+			
+			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+			DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
+			
+			// check the basic iteration properties
+			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
+			assertEquals(ITERATION_parallelism, iteration.getParallelism());
+			assertEquals(ITERATION_NAME, iteration.getName());
+			
+			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+			
+			// validate that the semantic properties are set as they should
+			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
+			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
+			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
+			
+			TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
+			
+			// validate that the broadcast sets are forwarded
+			assertEquals(bcVar, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
+			assertEquals(bcVar, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static class UpdateFunction extends VertexUpdateFunction<String, Double, Long> {
+
+		@Override
+		public void updateVertex(Vertex<String, Double> vertex, MessageIterator<Long> inMessages) {}
+	}
+	
+	public static class MessageFunctionNoEdgeValue extends MessagingFunction<String, Double, Long, NullValue> {
+
+		@Override
+		public void sendMessages(Vertex<String, Double> vertex) {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
new file mode 100644
index 0000000..3fbd0bc
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class CollectionModeSuperstepITCase {
+
+	/**
+	 * Dummy iteration to test that the supersteps are correctly incremented
+	 * and can be retrieved from inside the updated and messaging functions.
+	 * All vertices start with value 1 and increase their value by 1
+	 * in each iteration. 
+	 */
+	@Test
+	public void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), 
+				TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
+		
+		Graph<Long, Long, Long> result = graph.runVertexCentricIteration(
+				new UpdateFunction(), new MessageFunction(), 10);
+
+		result.getVertices().map(
+				new VertexToTuple2Map<Long, Long>()).output(
+						new DiscardingOutputFormat<Tuple2<Long, Long>>());
+		env.execute();
+	}
+	
+	public static final class UpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
+		@Override
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+			long superstep = getSuperstepNumber();
+			Assert.assertEquals(true, vertex.getValue() == superstep);
+			setNewVertexValue(vertex.getValue() + 1);
+		}
+	}
+	
+	public static final class MessageFunction extends MessagingFunction<Long, Long, Long, Long> {
+		@Override
+		public void sendMessages(Vertex<Long, Long> vertex) {
+			long superstep = getSuperstepNumber();
+			Assert.assertEquals(true, vertex.getValue() == superstep);
+			//send message to keep vertices active
+			sendMessageToAllNeighbors(vertex.getValue());
+		}
+	}
+
+	public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
+
+		public Long map(Vertex<Long, Long> value) {
+			return 1l;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
new file mode 100644
index 0000000..67d32a8
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GSAConfiguration;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.GatherSumApplyIteration;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.LongValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.HashSet;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase {
+
+	public GatherSumApplyConfigurationITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	private String expectedResult;
+
+	@Test
+	public void testRunWithConfiguration() throws Exception {
+		/*
+		 * Test Graph's runGatherSumApplyIteration when configuration parameters are provided
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+				TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
+
+		// create the configuration object
+		GSAConfiguration parameters = new GSAConfiguration();
+
+		parameters.addBroadcastSetForGatherFunction("gatherBcastSet", env.fromElements(1, 2, 3));
+		parameters.addBroadcastSetForSumFunction("sumBcastSet", env.fromElements(4, 5, 6));
+		parameters.addBroadcastSetForApplyFunction("applyBcastSet", env.fromElements(7, 8, 9));
+		parameters.registerAggregator("superstepAggregator", new LongSumAggregator());
+		parameters.setOptNumVertices(true);
+
+		Graph<Long, Long, Long> res = graph.runGatherSumApplyIteration(new Gather(), new Sum(),
+				new Apply(), 10, parameters);
+
+        DataSet<Vertex<Long, Long>> data = res.getVertices();
+        List<Vertex<Long, Long>> result= data.collect();
+
+		expectedResult = "1,11\n" +
+				"2,11\n" +
+				"3,11\n" +
+				"4,11\n" +
+				"5,11";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testIterationConfiguration() throws Exception {
+
+		/*
+		 * Test name, parallelism and solutionSetUnmanaged parameters
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		GatherSumApplyIteration<Long, Long, Long, Long> iteration = GatherSumApplyIteration
+				.withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyGather(),
+						new DummySum(), new DummyApply(), 10);
+
+		GSAConfiguration parameters = new GSAConfiguration();
+		parameters.setName("gelly iteration");
+		parameters.setParallelism(2);
+		parameters.setSolutionSetUnmanagedMemory(true);
+
+		iteration.configure(parameters);
+
+		Assert.assertEquals("gelly iteration", iteration.getIterationConfiguration().getName(""));
+		Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism());
+		Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());
+
+		DataSet<Vertex<Long, Long>> data = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
+        List<Vertex<Long, Long>> result= data.collect();
+        
+		expectedResult = "1,11\n" +
+				"2,12\n" +
+				"3,13\n" +
+				"4,14\n" +
+				"5,15";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testIterationDefaultDirection() throws Exception {
+
+		/*
+		 * Test that if no direction parameter is given, the iteration works as before
+		 * (i.e. it gathers information from the IN edges and neighbors and the information is calculated for an OUT edge
+		 * Default direction parameter is OUT for the GatherSumApplyIterations)
+		 * When data is gathered from the IN edges the Gather Sum and Apply functions
+		 * set the set of vertices which have path to a vertex as the value of that vertex
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		List<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdges();
+
+		edges.remove(0);
+
+		Graph<Long, HashSet<Long>, Long> graph = Graph
+				.fromCollection(TestGraphUtils.getLongLongVertices(), edges, env)
+				.mapVertices(new GatherSumApplyConfigurationITCase.InitialiseHashSetMapper());
+
+		DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph.runGatherSumApplyIteration(
+				new GetReachableVertices(), new FindAllReachableVertices(), new UpdateReachableVertices(), 4)
+				.getVertices();
+
+		List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+
+		expectedResult = "1,[1, 2, 3, 4, 5]\n"
+						+"2,[2]\n"
+						+"3,[1, 2, 3, 4, 5]\n"
+						+"4,[1, 2, 3, 4, 5]\n"
+						+"5,[1, 2, 3, 4, 5]\n";
+
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testIterationDirectionIN() throws Exception {
+
+		/*
+		 * Test that if the direction parameter IN is given, the iteration works as expected
+		 * (i.e. it gathers information from the OUT edges and neighbors and the information is calculated for an IN edge
+		 * When data is gathered from the OUT edges the Gather Sum and Apply functions
+		 * set the set of vertices which have path from a vertex as the value of that vertex
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		GSAConfiguration parameters = new GSAConfiguration();
+
+		parameters.setDirection(EdgeDirection.IN);
+
+		List<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdges();
+
+		edges.remove(0);
+
+		Graph<Long, HashSet<Long>, Long> graph = Graph
+				.fromCollection(TestGraphUtils.getLongLongVertices(), edges, env)
+				.mapVertices(new GatherSumApplyConfigurationITCase.InitialiseHashSetMapper());
+
+		DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph.runGatherSumApplyIteration(
+				new GetReachableVertices(), new FindAllReachableVertices(), new UpdateReachableVertices(), 4,
+																								parameters)
+				.getVertices();
+		List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+
+		expectedResult = "1,[1, 3, 4, 5]\n"
+				+"2,[1, 2, 3, 4, 5]\n"
+				+"3,[1, 3, 4, 5]\n"
+				+"4,[1, 3, 4, 5]\n"
+				+"5,[1, 3, 4, 5]\n";
+
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testIterationDirectionALL() throws Exception {
+
+		/*
+		 * Test that if the direction parameter OUT is given, the iteration works as expected
+		 * (i.e. it gathers information from both IN and OUT edges and neighbors
+		 * When data is gathered from the ALL edges the Gather Sum and Apply functions
+		 * set the set of vertices which are connected to a Vertex through some path as value of that vertex
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		GSAConfiguration parameters = new GSAConfiguration();
+		parameters.setDirection(EdgeDirection.ALL);
+
+		List<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdges();
+
+		edges.remove(0);
+
+		Graph<Long, HashSet<Long>, Long> graph = Graph
+				.fromCollection(TestGraphUtils.getLongLongVertices(), edges, env)
+				.mapVertices(new GatherSumApplyConfigurationITCase.InitialiseHashSetMapper());
+
+		DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph.runGatherSumApplyIteration(
+				new GetReachableVertices(), new FindAllReachableVertices(), new UpdateReachableVertices(), 4,
+				parameters)
+				.getVertices();
+
+		List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+
+		expectedResult = "1,[1, 2, 3, 4, 5]\n"
+				+"2,[1, 2, 3, 4, 5]\n"
+				+"3,[1, 2, 3, 4, 5]\n"
+				+"4,[1, 2, 3, 4, 5]\n"
+				+"5,[1, 2, 3, 4, 5]\n";
+
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@SuppressWarnings("serial")
+	private static final class Gather extends GatherFunction<Long, Long, Long> {
+
+		@Override
+		public void preSuperstep() {
+
+			// test bcast variable
+			@SuppressWarnings("unchecked")
+			List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("gatherBcastSet");
+			Assert.assertEquals(1, bcastSet.get(0));
+			Assert.assertEquals(2, bcastSet.get(1));
+			Assert.assertEquals(3, bcastSet.get(2));
+
+			// test aggregator
+			if (getSuperstepNumber() == 2) {
+				long aggrValue = ((LongValue)getPreviousIterationAggregate("superstepAggregator")).getValue();
+
+				Assert.assertEquals(7, aggrValue);
+			}
+
+			// test number of vertices
+			Assert.assertEquals(5, getNumberOfVertices());
+		}
+
+		public Long gather(Neighbor<Long, Long> neighbor) {
+			return neighbor.getNeighborValue();
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class Sum extends SumFunction<Long, Long, Long> {
+
+		LongSumAggregator aggregator = new LongSumAggregator();
+
+		@Override
+		public void preSuperstep() {
+
+			// test bcast variable
+			@SuppressWarnings("unchecked")
+			List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("sumBcastSet");
+			Assert.assertEquals(4, bcastSet.get(0));
+			Assert.assertEquals(5, bcastSet.get(1));
+			Assert.assertEquals(6, bcastSet.get(2));
+
+			// test aggregator
+			aggregator = getIterationAggregator("superstepAggregator");
+
+			// test number of vertices
+			Assert.assertEquals(5, getNumberOfVertices());
+		}
+
+		public Long sum(Long newValue, Long currentValue) {
+			long superstep = getSuperstepNumber();
+			aggregator.aggregate(superstep);
+			return 0l;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class Apply extends ApplyFunction<Long, Long, Long> {
+
+		LongSumAggregator aggregator = new LongSumAggregator();
+
+		@Override
+		public void preSuperstep() {
+
+			// test bcast variable
+			@SuppressWarnings("unchecked")
+			List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("applyBcastSet");
+			Assert.assertEquals(7, bcastSet.get(0));
+			Assert.assertEquals(8, bcastSet.get(1));
+			Assert.assertEquals(9, bcastSet.get(2));
+
+			// test aggregator
+			aggregator = getIterationAggregator("superstepAggregator");
+
+			// test number of vertices
+			Assert.assertEquals(5, getNumberOfVertices());
+		}
+
+		public void apply(Long summedValue, Long origValue) {
+			long superstep = getSuperstepNumber();
+			aggregator.aggregate(superstep);
+			setResult(origValue + 1);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class DummyGather extends GatherFunction<Long, Long, Long> {
+
+		@Override
+		public void preSuperstep() {
+			// test number of vertices
+			// when the numVertices option is not set, -1 is returned
+			Assert.assertEquals(-1, getNumberOfVertices());
+		}
+
+		public Long gather(Neighbor<Long, Long> neighbor) {
+			return neighbor.getNeighborValue();
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class DummySum extends SumFunction<Long, Long, Long> {
+
+		public Long sum(Long newValue, Long currentValue) {
+			return 0l;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class DummyApply extends ApplyFunction<Long, Long, Long> {
+
+		public void apply(Long summedValue, Long origValue) {
+			setResult(origValue + 1);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
+
+		public Long map(Vertex<Long, Long> value) {
+			return 1l;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class InitialiseHashSetMapper implements MapFunction<Vertex<Long, Long>, HashSet<Long>> {
+
+		@Override
+		public HashSet<Long> map(Vertex<Long, Long> value) throws Exception {
+			HashSet<Long> h = new HashSet<Long>();
+			h.add(value.getId());
+			return h;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class GetReachableVertices extends GatherFunction<HashSet<Long>, Long, HashSet<Long>> {
+
+		@Override
+		public HashSet<Long> gather(Neighbor<HashSet<Long>, Long> neighbor) {
+			return neighbor.getNeighborValue();
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class FindAllReachableVertices extends SumFunction<HashSet<Long>, Long, HashSet<Long>> {
+		@Override
+		public HashSet<Long> sum(HashSet<Long> newSet, HashSet<Long> currentSet) {
+			HashSet<Long> set = currentSet;
+			for(Long l : newSet) {
+				set.add(l);
+			}
+			return set;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class UpdateReachableVertices extends ApplyFunction<Long, HashSet<Long>, HashSet<Long>> {
+
+		@Override
+		public void apply(HashSet<Long> newValue, HashSet<Long> currentValue) {
+			newValue.addAll(currentValue);
+			if(newValue.size()>currentValue.size()) {
+				setResult(newValue);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
new file mode 100755
index 0000000..0213f02
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.graph.library.GSAConnectedComponents;
+import org.apache.flink.graph.library.GSASingleSourceShortestPaths;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class GatherSumApplyITCase extends MultipleProgramsTestBase {
+
+	public GatherSumApplyITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+	private String expectedResult;
+
+	// --------------------------------------------------------------------------------------------
+	//  Connected Components Test
+	// --------------------------------------------------------------------------------------------
+
+	@Test
+	public void testConnectedComponents() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+				ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
+				new InitMapperCC(), env);
+
+        List<Vertex<Long, Long>> result = inputGraph.run(
+        		new GSAConnectedComponents<Long, NullValue>(16)).collect();
+
+		expectedResult = "1,1\n" +
+				"2,1\n" +
+				"3,1\n" +
+				"4,1\n";
+
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Single Source Shortest Path Test
+	// --------------------------------------------------------------------------------------------
+
+	@Test
+	public void testSingleSourceShortestPaths() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+				SingleSourceShortestPathsData.getDefaultEdgeDataSet(env),
+				new InitMapperSSSP(), env);
+
+        List<Vertex<Long, Double>> result = inputGraph.run(
+        		new GSASingleSourceShortestPaths<Long>(1l, 16)).collect();
+
+		expectedResult = "1,0.0\n" +
+				"2,12.0\n" +
+				"3,13.0\n" +
+				"4,47.0\n" +
+				"5,48.0\n";
+
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitMapperCC implements MapFunction<Long, Long> {
+		public Long map(Long value) {
+			return value;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitMapperSSSP implements MapFunction<Long, Double> {
+		public Double map(Long value) {
+			return 0.0;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
new file mode 100644
index 0000000..294926f
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+
+public class TestGraphUtils {
+
+	public static final DataSet<Vertex<Long, Long>> getLongLongVertexData(
+			ExecutionEnvironment env) {
+
+		return env.fromCollection(getLongLongVertices());
+	}
+	
+	public static final DataSet<Edge<Long, Long>> getLongLongEdgeData(
+			ExecutionEnvironment env) {
+
+		return env.fromCollection(getLongLongEdges());
+	}
+
+	public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidSrcData(
+			ExecutionEnvironment env) {
+		List<Edge<Long, Long>> edges = getLongLongEdges();
+
+		edges.remove(1);
+		edges.add(new Edge<Long, Long>(13L, 3L, 13L));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidTrgData(
+			ExecutionEnvironment env) {
+		List<Edge<Long, Long>> edges =  getLongLongEdges();
+
+		edges.remove(0);
+		edges.add(new Edge<Long, Long>(3L, 13L, 13L));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidSrcTrgData(
+			ExecutionEnvironment env) {
+		List<Edge<Long, Long>> edges = getLongLongEdges();
+		edges.remove(0);
+		edges.remove(1);
+		edges.remove(2);
+		edges.add(new Edge<Long, Long>(13L, 3L, 13L));
+		edges.add(new Edge<Long, Long>(1L, 12L, 12L));
+		edges.add(new Edge<Long, Long>(13L, 33L, 13L));
+		return env.fromCollection(edges);
+	}
+
+	public static final DataSet<Edge<String, Long>> getStringLongEdgeData(
+			ExecutionEnvironment env) {
+		List<Edge<String, Long>> edges = new ArrayList<Edge<String, Long>>();
+		edges.add(new Edge<String, Long>("1", "2", 12L));
+		edges.add(new Edge<String, Long>("1", "3", 13L));
+		edges.add(new Edge<String, Long>("2", "3", 23L));
+		edges.add(new Edge<String, Long>("3", "4", 34L));
+		edges.add(new Edge<String, Long>("3", "5", 35L));
+		edges.add(new Edge<String, Long>("4", "5", 45L));
+		edges.add(new Edge<String, Long>("5", "1", 51L));
+		return env.fromCollection(edges);
+	}
+
+	public static final DataSet<Tuple2<Long, Long>> getLongLongTuple2Data(
+			ExecutionEnvironment env) {
+		List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, Long>>();
+		tuples.add(new Tuple2<Long, Long>(1L, 10L));
+		tuples.add(new Tuple2<Long, Long>(2L, 20L));
+		tuples.add(new Tuple2<Long, Long>(3L, 30L));
+		tuples.add(new Tuple2<Long, Long>(4L, 40L));
+		tuples.add(new Tuple2<Long, Long>(6L, 60L));
+
+		return env.fromCollection(tuples);
+	}
+
+	public static final DataSet<Tuple2<Long, Long>> getLongLongTuple2SourceData(
+			ExecutionEnvironment env) {
+		List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, Long>>();
+		tuples.add(new Tuple2<Long, Long>(1L, 10L));
+		tuples.add(new Tuple2<Long, Long>(1L, 20L));
+		tuples.add(new Tuple2<Long, Long>(2L, 30L));
+		tuples.add(new Tuple2<Long, Long>(3L, 40L));
+		tuples.add(new Tuple2<Long, Long>(3L, 50L));
+		tuples.add(new Tuple2<Long, Long>(4L, 60L));
+		tuples.add(new Tuple2<Long, Long>(6L, 70L));
+
+		return env.fromCollection(tuples);
+	}
+
+	public static final DataSet<Tuple2<Long, Long>> getLongLongTuple2TargetData(
+			ExecutionEnvironment env) {
+		List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, Long>>();
+		tuples.add(new Tuple2<Long, Long>(2L, 10L));
+		tuples.add(new Tuple2<Long, Long>(3L, 20L));
+		tuples.add(new Tuple2<Long, Long>(3L, 30L));
+		tuples.add(new Tuple2<Long, Long>(4L, 40L));
+		tuples.add(new Tuple2<Long, Long>(6L, 50L));
+		tuples.add(new Tuple2<Long, Long>(6L, 60L));
+		tuples.add(new Tuple2<Long, Long>(1L, 70L));
+
+		return env.fromCollection(tuples);
+	}
+
+	public static final DataSet<Tuple3<Long, Long, Long>> getLongLongLongTuple3Data(
+			ExecutionEnvironment env) {
+		List<Tuple3<Long, Long, Long>> tuples = new ArrayList<Tuple3<Long, Long, Long>>();
+		tuples.add(new Tuple3<Long, Long, Long>(1L, 2L, 12L));
+		tuples.add(new Tuple3<Long, Long, Long>(1L, 3L, 13L));
+		tuples.add(new Tuple3<Long, Long, Long>(2L, 3L, 23L));
+		tuples.add(new Tuple3<Long, Long, Long>(3L, 4L, 34L));
+		tuples.add(new Tuple3<Long, Long, Long>(3L, 6L, 36L));
+		tuples.add(new Tuple3<Long, Long, Long>(4L, 6L, 46L));
+		tuples.add(new Tuple3<Long, Long, Long>(6L, 1L, 61L));
+
+		return env.fromCollection(tuples);
+	}
+
+	public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2Data(
+			ExecutionEnvironment env) {
+		List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<Tuple2<Long,
+				DummyCustomParameterizedType<Float>>>();
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(1L,
+				new DummyCustomParameterizedType<Float>(10, 10f)));
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(2L,
+				new DummyCustomParameterizedType<Float>(20, 20f)));
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
+				new DummyCustomParameterizedType<Float>(30, 30f)));
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(4L,
+				new DummyCustomParameterizedType<Float>(40, 40f)));
+		return env.fromCollection(tuples);
+	}
+
+	public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2SourceData(
+			ExecutionEnvironment env) {
+		List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<Tuple2<Long,
+				DummyCustomParameterizedType<Float>>>();
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(1L,
+				new DummyCustomParameterizedType<Float>(10, 10f)));
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(1L,
+				new DummyCustomParameterizedType<Float>(20, 20f)));
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(2L,
+				new DummyCustomParameterizedType<Float>(30, 30f)));
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
+				new DummyCustomParameterizedType<Float>(40, 40f)));
+
+		return env.fromCollection(tuples);
+	}
+
+	public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2TargetData(
+			ExecutionEnvironment env) {
+		List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<Tuple2<Long,
+				DummyCustomParameterizedType<Float>>>();
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(2L,
+				new DummyCustomParameterizedType<Float>(10, 10f)));
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
+				new DummyCustomParameterizedType<Float>(20, 20f)));
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
+				new DummyCustomParameterizedType<Float>(30, 30f)));
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(4L,
+				new DummyCustomParameterizedType<Float>(40, 40f)));
+
+		return env.fromCollection(tuples);
+	}
+
+	public static final DataSet<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> getLongLongCustomTuple3Data(
+			ExecutionEnvironment env) {
+		List<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> tuples = 
+				new ArrayList<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>>();
+		tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(1L, 2L,
+				new DummyCustomParameterizedType<Float>(10, 10f)));
+		tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(1L, 3L,
+				new DummyCustomParameterizedType<Float>(20, 20f)));
+		tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(2L, 3L,
+				new DummyCustomParameterizedType<Float>(30, 30f)));
+		tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(3L, 4L,
+				new DummyCustomParameterizedType<Float>(40, 40f)));
+
+		return env.fromCollection(tuples);
+	}
+
+	/**
+	 * A graph with invalid vertex ids
+	 */
+	public static final DataSet<Vertex<Long, Long>> getLongLongInvalidVertexData(
+			ExecutionEnvironment env) {
+		List<Vertex<Long, Long>> vertices = getLongLongVertices();
+
+		vertices.remove(0);
+		vertices.add(new Vertex<Long, Long>(15L, 1L));
+
+		return env.fromCollection(vertices);
+	}
+
+	/**
+	 * A graph that has at least one vertex with no ingoing/outgoing edges
+	 */
+	public static final DataSet<Edge<Long, Long>> getLongLongEdgeDataWithZeroDegree(
+			ExecutionEnvironment env) {
+		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+		edges.add(new Edge<Long, Long>(1L, 2L, 12L));
+		edges.add(new Edge<Long, Long>(1L, 4L, 14L));
+		edges.add(new Edge<Long, Long>(1L, 5L, 15L));
+		edges.add(new Edge<Long, Long>(2L, 3L, 23L));
+		edges.add(new Edge<Long, Long>(3L, 5L, 35L));
+		edges.add(new Edge<Long, Long>(4L, 5L, 45L));
+
+		return env.fromCollection(edges);
+	}
+
+	/**
+	 * Function that produces an ArrayList of vertices
+	 */
+	public static final List<Vertex<Long, Long>> getLongLongVertices() {
+		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+		vertices.add(new Vertex<Long, Long>(1L, 1L));
+		vertices.add(new Vertex<Long, Long>(2L, 2L));
+		vertices.add(new Vertex<Long, Long>(3L, 3L));
+		vertices.add(new Vertex<Long, Long>(4L, 4L));
+		vertices.add(new Vertex<Long, Long>(5L, 5L));
+
+		return vertices;
+	}
+
+	public static final List<Vertex<Long, Boolean>> getLongBooleanVertices() {
+		List<Vertex<Long, Boolean>> vertices = new ArrayList<Vertex<Long, Boolean>>();
+		vertices.add(new Vertex<Long, Boolean>(1L, true));
+		vertices.add(new Vertex<Long, Boolean>(2L, true));
+		vertices.add(new Vertex<Long, Boolean>(3L, true));
+		vertices.add(new Vertex<Long, Boolean>(4L, true));
+		vertices.add(new Vertex<Long, Boolean>(5L, true));
+
+		return vertices;
+	}
+
+	public static final DataSet<Edge<Long, Long>> getDisconnectedLongLongEdgeData(
+				ExecutionEnvironment env) {
+			List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+			edges.add(new Edge<Long, Long>(1L, 2L, 12L));
+			edges.add(new Edge<Long, Long>(1L, 3L, 13L));
+			edges.add(new Edge<Long, Long>(2L, 3L, 23L));
+			edges.add(new Edge<Long, Long>(4L, 5L, 45L));
+			
+			return env.fromCollection(edges);
+		}
+	
+	/**
+	 * Function that produces an ArrayList of edges
+	 */
+	public static final List<Edge<Long, Long>> getLongLongEdges() {
+		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+		edges.add(new Edge<Long, Long>(1L, 2L, 12L));
+		edges.add(new Edge<Long, Long>(1L, 3L, 13L));
+		edges.add(new Edge<Long, Long>(2L, 3L, 23L));
+		edges.add(new Edge<Long, Long>(3L, 4L, 34L));
+		edges.add(new Edge<Long, Long>(3L, 5L, 35L));
+		edges.add(new Edge<Long, Long>(4L, 5L, 45L));
+		edges.add(new Edge<Long, Long>(5L, 1L, 51L));
+	
+		return edges;
+	}
+
+	public static class DummyCustomType implements Serializable {
+		private static final long serialVersionUID = 1L;
+		
+		private int intField;
+		private boolean booleanField;
+		
+		public DummyCustomType(int intF, boolean boolF) {
+			this.intField = intF;
+			this.booleanField = boolF;
+		}
+		
+		public DummyCustomType() {
+			this.intField = 0;
+			this.booleanField = true;
+		}
+
+		public int getIntField() {
+			return intField;
+		}
+		
+		public void setIntField(int intF) {
+			this.intField = intF;
+		}
+		
+		public boolean getBooleanField() {
+			return booleanField;
+		}
+		
+		@Override
+		public String toString() {
+			return booleanField ? "(T," + intField + ")" : "(F," + intField + ")";
+		}
+	}
+	
+	public static class DummyCustomParameterizedType<T> implements Serializable {
+		private static final long serialVersionUID = 1L;
+		
+		private int intField;
+		private T tField;
+		
+		public DummyCustomParameterizedType(int intF, T tF) {
+			this.intField = intF;
+			this.tField = tF;
+		}
+		
+		public DummyCustomParameterizedType() {
+			this.intField = 0;
+			this.tField = null;
+		}
+
+		public int getIntField() {
+			return intField;
+		}
+		
+		public void setIntField(int intF) {
+			this.intField = intF;
+		}
+		
+		public void setTField(T tF) {
+			this.tField = tF;
+		}
+		
+		public T getTField() {
+			return tField;
+		}
+		
+		@Override
+		public String toString() {
+			return "(" + tField.toString() + "," + intField + ")";
+		}
+	}
+
+	/**
+	 * Method useful for suppressing sysout printing
+	 */
+	public static void pipeSystemOutToNull() {
+		System.setOut(new PrintStream(new BlackholeOutputSteam()));
+	}
+
+	private static final class BlackholeOutputSteam extends java.io.OutputStream {
+		@Override
+		public void write(int b){}
+	}
+
+	/**
+	 * utils for getting the second graph for the test of method difference();
+	 * @param env
+	 */
+	public static final DataSet<Edge<Long,Long>> getLongLongEdgeDataDifference(
+			ExecutionEnvironment env){
+		return env.fromCollection(getLongLongEdgesForDifference());
+	}
+
+	public static final DataSet<Edge<Long,Long>> getLongLongEdgeDataDifference2(
+			ExecutionEnvironment env){
+		return env.fromCollection(getLongLongEdgesForDifference2());
+	}
+
+	public static final DataSet<Vertex<Long,Long>> getLongLongVertexDataDifference(
+			ExecutionEnvironment env)
+	{
+		return env.fromCollection(getVerticesForDifference());
+	}
+
+	public static final List<Vertex<Long,Long>> getVerticesForDifference(){
+		List<Vertex<Long,Long>> vertices = new ArrayList<Vertex<Long,Long>>();
+		vertices.add(new Vertex<Long, Long>(1L, 1L));
+		vertices.add(new Vertex<Long, Long>(3L, 3L));
+		vertices.add(new Vertex<Long, Long>(6L, 6L));
+
+		return vertices;
+
+	}
+
+	public static final List<Edge<Long, Long>> getLongLongEdgesForDifference() {
+		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+		edges.add(new Edge<Long, Long>(1L, 3L, 13L));
+		edges.add(new Edge<Long, Long>(1L, 6L, 26L));
+		edges.add(new Edge<Long, Long>(6L, 3L, 63L));
+		return edges;
+	}
+
+	public static final List<Edge<Long, Long>> getLongLongEdgesForDifference2() {
+		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+		edges.add(new Edge<Long, Long>(6L, 6L, 66L));
+		return edges;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
new file mode 100644
index 0000000..0feb3fb
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
@@ -0,0 +1,689 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexCentricConfiguration;
+import org.apache.flink.graph.spargel.VertexCentricIteration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.LongValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
+
+@RunWith(Parameterized.class)
+public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
+
+	public VertexCentricConfigurationITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+    private String expectedResult;
+
+	@Test
+	public void testRunWithConfiguration() throws Exception {
+		/*
+		 * Test Graph's runVertexCentricIteration when configuration parameters are provided
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), 
+				TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
+
+		// create the configuration object
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+		parameters.addBroadcastSetForUpdateFunction("updateBcastSet", env.fromElements(1, 2, 3));
+		parameters.addBroadcastSetForMessagingFunction("messagingBcastSet", env.fromElements(4, 5, 6));
+		parameters.registerAggregator("superstepAggregator", new LongSumAggregator());
+		parameters.setOptNumVertices(true);
+
+		Graph<Long, Long, Long> res = graph.runVertexCentricIteration(
+				new UpdateFunction(), new MessageFunction(), 10, parameters);
+
+		DataSet<Vertex<Long,Long>> data = res.getVertices();
+        List<Vertex<Long,Long>> result= data.collect();
+        
+		expectedResult = "1,11\n" +
+						"2,11\n" +
+						"3,11\n" +
+						"4,11\n" +
+						"5,11";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testIterationConfiguration() throws Exception {
+
+		/*
+		 * Test name, parallelism and solutionSetUnmanaged parameters
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		VertexCentricIteration<Long, Long, Long, Long> iteration = VertexCentricIteration
+				.withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyUpdateFunction(), 
+						new DummyMessageFunction(), 10);
+		
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+		parameters.setName("gelly iteration");
+		parameters.setParallelism(2);
+		parameters.setSolutionSetUnmanagedMemory(true);
+		
+		iteration.configure(parameters);
+		
+		Assert.assertEquals("gelly iteration", iteration.getIterationConfiguration().getName(""));
+		Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism());
+		Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());
+
+		DataSet<Vertex<Long, Long>> data = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
+        List<Vertex<Long,Long>> result= data.collect();
+        
+		expectedResult = "1,11\n" +
+						"2,12\n" +
+						"3,13\n" +
+						"4,14\n" +
+						"5,15";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testDefaultConfiguration() throws Exception {
+		/*
+		 * Test Graph's runVertexCentricIteration when configuration parameters are not provided
+		 * i.e. degrees and numVertices will be -1, EdgeDirection will be OUT.
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), 
+				TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
+
+		Graph<Long, Long, Long> res = graph.runVertexCentricIteration(
+				new UpdateFunctionDefault(), new MessageFunctionDefault(), 5);
+
+		
+		DataSet<Tuple2<Long, Long>> data = res.getVertices().map(new VertexToTuple2Map<Long, Long>());
+        List<Tuple2<Long, Long>> result= data.collect();
+        
+		expectedResult = "1,6\n" +
+						"2,6\n" +
+						"3,6\n" +
+						"4,6\n" +
+						"5,6";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testIterationDefaultDirection() throws Exception {
+
+		/*
+		 * Test that if no direction parameter is given, the iteration works as before
+		 * (i.e. it collects messages from the in-neighbors and sends them to the out-neighbors)
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, HashSet<Long>, Long> graph = Graph
+				.fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env)
+				.mapVertices(new InitialiseHashSetMapper());
+
+		DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
+				.runVertexCentricIteration(new VertexUpdateDirection(), new IdMessengerTrg(), 5)
+				.getVertices();
+
+        List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect();
+
+		expectedResult = "1,[5]\n" +
+				"2,[1]\n" +
+				"3,[1, 2]\n" +
+				"4,[3]\n" +
+				"5,[3, 4]";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testIterationINDirection() throws Exception {
+
+		/*
+		 * Test that if the direction parameter is set to IN,
+		 * messages are collected from the out-neighbors and sent to the in-neighbors.
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, HashSet<Long>, Long> graph = Graph
+				.fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env)
+				.mapVertices(new InitialiseHashSetMapper());
+
+		// configure the iteration
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+		parameters.setDirection(EdgeDirection.IN);
+
+		DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
+				.runVertexCentricIteration(new VertexUpdateDirection(), new IdMessengerSrc(), 5, parameters)
+				.getVertices();
+
+        List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect();
+
+		expectedResult = "1,[2, 3]\n" +
+				"2,[3]\n" +
+				"3,[4, 5]\n" +
+				"4,[5]\n" +
+				"5,[1]";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testIterationALLDirection() throws Exception {
+
+		/*
+		 * Test that if the direction parameter is set to ALL,
+		 * messages are collected from all the neighbors and sent to all the neighbors.
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, HashSet<Long>, Long> graph = Graph
+				.fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env)
+				.mapVertices(new InitialiseHashSetMapper());
+
+		// configure the iteration
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+		parameters.setDirection(EdgeDirection.ALL);
+
+		DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
+				.runVertexCentricIteration(new VertexUpdateDirection(), new IdMessengerAll(), 5, parameters)
+				.getVertices();
+
+        List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect();
+
+		expectedResult = "1,[2, 3, 5]\n" +
+				"2,[1, 3]\n" +
+				"3,[1, 2, 4, 5]\n" +
+				"4,[3, 5]\n" +
+				"5,[1, 3, 4]";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testNumVerticesNotSet() throws Exception {
+
+		/*
+		 * Test that if the number of vertices option is not set, -1 is returned as value.
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+				TestGraphUtils.getLongLongEdges(), env);
+
+		DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(),
+				new DummyMessageFunction(), 2).getVertices();
+
+        List<Vertex<Long, Long>> result= verticesWithNumVertices.collect();
+
+		expectedResult = "1,-1\n" +
+				"2,-1\n" +
+				"3,-1\n" +
+				"4,-1\n" +
+				"5,-1";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testInDegreesSet() throws Exception {
+
+		/*
+		 * Test that if the degrees are set, they can be accessed in every superstep 
+		 * inside the update function and the value
+		 * is correctly computed for degrees in the messaging function.
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+				TestGraphUtils.getLongLongEdges(), env);
+
+		// configure the iteration
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+		parameters.setOptDegrees(true);
+
+		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
+				new UpdateFunctionInDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices();
+
+        List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
+
+		expectedResult = "1,1\n" +
+				"2,1\n" +
+				"3,2\n" +
+				"4,1\n" +
+				"5,2";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testInDegreesNotSet() throws Exception {
+
+		/*
+		 * Test that if the degrees option is not set, then -1 is returned as a value for in-degree.
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+				TestGraphUtils.getLongLongEdges(), env);
+
+		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
+				new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices();
+
+        List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
+
+		expectedResult = "1,-1\n" +
+				"2,-1\n" +
+				"3,-1\n" +
+				"4,-1\n" +
+				"5,-1";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testOutDegreesSet() throws Exception {
+
+		/*
+		 * Test that if the degrees are set, they can be accessed in every superstep
+		 * inside the update function and the value
+		 * is correctly computed for degrees in the messaging function.
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+				TestGraphUtils.getLongLongEdges(), env);
+
+		// configure the iteration
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+		parameters.setOptDegrees(true);
+
+		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
+				new UpdateFunctionOutDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices();
+
+        List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
+
+		expectedResult = "1,2\n" +
+				"2,1\n" +
+				"3,2\n" +
+				"4,1\n" +
+				"5,1";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testOutDegreesNotSet() throws Exception {
+
+		/*
+		 * Test that if the degrees option is not set, then -1 is returned as a value for out-degree.
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+				TestGraphUtils.getLongLongEdges(), env);
+
+		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
+				new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices();
+
+        List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
+
+		expectedResult = "1,-1\n" +
+				"2,-1\n" +
+				"3,-1\n" +
+				"4,-1\n" +
+				"5,-1";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testDirectionALLAndDegrees() throws Exception {
+
+		/*
+		 * Compute the number of neighbors in a vertex - centric manner, and verify that it is equal to
+		 * the sum: inDegree + outDegree.
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Boolean, Long> graph = Graph.fromCollection(TestGraphUtils.getLongBooleanVertices(),
+				TestGraphUtils.getLongLongEdges(), env);
+
+		// configure the iteration
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+		parameters.setOptDegrees(true);
+		parameters.setDirection(EdgeDirection.ALL);
+
+		DataSet<Vertex<Long, Boolean>> verticesWithNumNeighbors = graph.runVertexCentricIteration(
+				new VertexUpdateNumNeighbors(), new IdMessenger(), 1, parameters).getVertices();
+
+        List<Vertex<Long, Boolean>> result= verticesWithNumNeighbors.collect();
+
+		expectedResult = "1,true\n" +
+				"2,true\n" +
+				"3,true\n" +
+				"4,true\n" +
+				"5,true";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@SuppressWarnings("serial")
+	public static final class UpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
+
+		LongSumAggregator aggregator = new LongSumAggregator();
+
+		@Override
+		public void preSuperstep() {
+			
+			// test bcast variable
+			@SuppressWarnings("unchecked")
+			List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("updateBcastSet");
+			Assert.assertEquals(1, bcastSet.get(0));
+			Assert.assertEquals(2, bcastSet.get(1));
+			Assert.assertEquals(3, bcastSet.get(2));
+			
+			// test aggregator
+			aggregator = getIterationAggregator("superstepAggregator");
+
+			// test number of vertices
+			Assert.assertEquals(5, getNumberOfVertices());
+			
+		}
+
+		@Override
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+			long superstep = getSuperstepNumber();
+			aggregator.aggregate(superstep);
+
+			setNewVertexValue(vertex.getValue() + 1);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class UpdateFunctionDefault extends VertexUpdateFunction<Long, Long, Long> {
+
+		LongSumAggregator aggregator = new LongSumAggregator();
+
+		@Override
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+
+			// test number of vertices
+			Assert.assertEquals(-1, getNumberOfVertices());
+
+			// test degrees
+			Assert.assertEquals(-1, getInDegree());
+			Assert.assertEquals(-1, getOutDegree());
+
+			setNewVertexValue(vertex.getValue() + 1);
+		}
+	}
+	
+	@SuppressWarnings("serial")
+	public static final class MessageFunction extends MessagingFunction<Long, Long, Long, Long> {
+
+		@Override
+		public void preSuperstep() {
+			
+			// test bcast variable
+			@SuppressWarnings("unchecked")
+			List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("messagingBcastSet");
+			Assert.assertEquals(4, bcastSet.get(0));
+			Assert.assertEquals(5, bcastSet.get(1));
+			Assert.assertEquals(6, bcastSet.get(2));
+
+			// test number of vertices
+			Assert.assertEquals(5, getNumberOfVertices());
+			
+			// test aggregator
+			if (getSuperstepNumber() == 2) {
+				long aggrValue = ((LongValue)getPreviousIterationAggregate("superstepAggregator")).getValue();
+				Assert.assertEquals(5, aggrValue);
+			}
+		}
+
+		@Override
+		public void sendMessages(Vertex<Long, Long> vertex) {
+			//send message to keep vertices active
+			sendMessageToAllNeighbors(vertex.getValue());
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class MessageFunctionDefault extends MessagingFunction<Long, Long, Long, Long> {
+
+		@Override
+		public void sendMessages(Vertex<Long, Long> vertex) {
+			// test number of vertices
+			Assert.assertEquals(-1, getNumberOfVertices());
+
+			// test degrees
+			Assert.assertEquals(-1, getInDegree());
+			Assert.assertEquals(-1, getOutDegree());
+			//send message to keep vertices active
+			sendMessageToAllNeighbors(vertex.getValue());
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class UpdateFunctionNumVertices extends VertexUpdateFunction<Long, Long, Long> {
+
+		@Override
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+				setNewVertexValue(getNumberOfVertices());
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class DummyUpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
+
+		@Override
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+			setNewVertexValue(vertex.getValue() + 1);
+		}
+	}
+	
+	@SuppressWarnings("serial")
+	public static final class DummyMessageFunction extends MessagingFunction<Long, Long, Long, Long> {
+
+		@Override
+		public void sendMessages(Vertex<Long, Long> vertex) {
+			//send message to keep vertices active
+			sendMessageToAllNeighbors(vertex.getValue());
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class DegreesMessageFunction extends MessagingFunction<Long, Long, Long, Long> {
+
+		@Override
+		public void sendMessages(Vertex<Long, Long> vertex) {
+			if (vertex.getId().equals(1)) {
+				Assert.assertEquals(2, getOutDegree());
+				Assert.assertEquals(1, getInDegree());
+			}
+			else if(vertex.getId().equals(3)) {
+				Assert.assertEquals(2, getOutDegree());
+				Assert.assertEquals(2, getInDegree());
+			}
+			//send message to keep vertices active
+			sendMessageToAllNeighbors(vertex.getValue());
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class VertexUpdateDirection extends VertexUpdateFunction<Long, HashSet<Long>, Long> {
+
+		@Override
+		public void updateVertex(Vertex<Long, HashSet<Long>> vertex, MessageIterator<Long> messages) throws Exception {
+			vertex.getValue().clear();
+
+			for(long msg : messages) {
+				vertex.getValue().add(msg);
+			}
+
+			setNewVertexValue(vertex.getValue());
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class UpdateFunctionInDegrees extends VertexUpdateFunction<Long, Long, Long> {
+
+		@Override
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+			long inDegree = getInDegree();
+			setNewVertexValue(inDegree);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class UpdateFunctionOutDegrees extends VertexUpdateFunction<Long, Long, Long> {
+
+		@Override
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+			long outDegree = getOutDegree();
+			setNewVertexValue(outDegree);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class VertexUpdateNumNeighbors extends VertexUpdateFunction<Long, Boolean,
+			Long> {
+
+		@Override
+		public void updateVertex(Vertex<Long, Boolean> vertex, MessageIterator<Long> messages) throws Exception {
+
+			long count = 0;
+
+			for(@SuppressWarnings("unused") long msg : messages) {
+				count++;
+			}
+			setNewVertexValue(count == (getInDegree() + getOutDegree()));
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class UpdateFunctionDegrees extends VertexUpdateFunction<Long, Long, Long> {
+
+		@Override
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+			long inDegree = getInDegree();
+			long outDegree = getOutDegree();
+			setNewVertexValue(inDegree + outDegree);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class IdMessengerSrc extends MessagingFunction<Long, HashSet<Long>, Long, Long> {
+
+		@Override
+		public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception {
+			for (Edge<Long, Long> edge : getEdges()) {
+				sendMessageTo(edge.getSource(), vertex.getId());
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class IdMessengerAll extends MessagingFunction<Long, HashSet<Long>, Long, Long> {
+
+		@Override
+		public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception {
+			for (Edge<Long, Long> edge : getEdges()) {
+				if(edge.getSource() != vertex.getId()) {
+					sendMessageTo(edge.getSource(), vertex.getId());
+				} else {
+					sendMessageTo(edge.getTarget(), vertex.getId());
+				}
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class IdMessenger extends MessagingFunction<Long, Boolean, Long, Long> {
+
+		@Override
+		public void sendMessages(Vertex<Long, Boolean> vertex) throws Exception {
+			for (Edge<Long, Long> edge : getEdges()) {
+				if(edge.getSource() != vertex.getId()) {
+					sendMessageTo(edge.getSource(), vertex.getId());
+				} else {
+					sendMessageTo(edge.getTarget(), vertex.getId());
+				}
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class IdMessengerTrg extends MessagingFunction<Long, HashSet<Long>, Long, Long> {
+
+		@Override
+		public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception {
+			for (Edge<Long, Long> edge : getEdges()) {
+				sendMessageTo(edge.getTarget(), vertex.getId());
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
+
+		public Long map(Vertex<Long, Long> value) {
+			return 1l;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class InitialiseHashSetMapper implements MapFunction<Vertex<Long, Long>, HashSet<Long>> {
+
+		@Override
+		public HashSet<Long> map(Vertex<Long, Long> value) throws Exception {
+			return new HashSet<Long>();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java
new file mode 100644
index 0000000..b0bacc4
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.ConnectedComponents;
+import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class ConnectedComponentsITCase extends MultipleProgramsTestBase {
+
+	private String edgesPath;
+
+	private String resultPath;
+
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public ConnectedComponentsITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception {
+		resultPath = tempFolder.newFile().toURI().toString();
+
+		File edgesFile = tempFolder.newFile();
+		Files.write(ConnectedComponentsDefaultData.EDGES, edgesFile, Charsets.UTF_8);
+		edgesPath = edgesFile.toURI().toString();
+	}
+
+	@Test
+	public void testConnectedComponentsExample() throws Exception {
+		ConnectedComponents.main(new String[]{edgesPath, resultPath, ConnectedComponentsDefaultData.MAX_ITERATIONS + ""});
+		expected = ConnectedComponentsDefaultData.VERTICES_WITH_MIN_ID;
+	}
+
+	@After
+	public void after() throws Exception {
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java
new file mode 100644
index 0000000..183c429
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.EuclideanGraphWeighing;
+import org.apache.flink.graph.example.utils.EuclideanGraphData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class EuclideanGraphWeighingITCase extends MultipleProgramsTestBase {
+
+	private String verticesPath;
+
+	private String edgesPath;
+
+	private String resultPath;
+
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public EuclideanGraphWeighingITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception {
+		resultPath = tempFolder.newFile().toURI().toString();
+		File verticesFile = tempFolder.newFile();
+		Files.write(EuclideanGraphData.VERTICES, verticesFile, Charsets.UTF_8);
+
+		File edgesFile = tempFolder.newFile();
+		Files.write(EuclideanGraphData.EDGES, edgesFile, Charsets.UTF_8);
+
+		verticesPath = verticesFile.toURI().toString();
+		edgesPath = edgesFile.toURI().toString();
+	}
+
+	@Test
+	public void testGraphWeightingWeighing() throws Exception {
+		EuclideanGraphWeighing.main(new String[]{verticesPath, edgesPath, resultPath});
+		expected = EuclideanGraphData.RESULTED_WEIGHTED_EDGES;
+	}
+
+	@After
+	public void after() throws Exception {
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
new file mode 100644
index 0000000..c19411b
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.IncrementalSSSP;
+import org.apache.flink.graph.example.utils.IncrementalSSSPData;
+import org.apache.flink.graph.spargel.VertexCentricConfiguration;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class IncrementalSSSPITCase extends MultipleProgramsTestBase {
+
+	private String verticesPath;
+
+	private String edgesPath;
+
+	private String edgesInSSSPPath;
+
+	private String resultPath;
+
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public IncrementalSSSPITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception {
+		resultPath = tempFolder.newFile().toURI().toString();
+		File verticesFile = tempFolder.newFile();
+		Files.write(IncrementalSSSPData.VERTICES, verticesFile, Charsets.UTF_8);
+
+		File edgesFile = tempFolder.newFile();
+		Files.write(IncrementalSSSPData.EDGES, edgesFile, Charsets.UTF_8);
+
+		File edgesInSSSPFile = tempFolder.newFile();
+		Files.write(IncrementalSSSPData.EDGES_IN_SSSP, edgesInSSSPFile, Charsets.UTF_8);
+
+		verticesPath = verticesFile.toURI().toString();
+		edgesPath = edgesFile.toURI().toString();
+		edgesInSSSPPath = edgesInSSSPFile.toURI().toString();
+	}
+
+	@Test
+	public void testIncrementalSSSP() throws Exception {
+		IncrementalSSSP.main(new String[]{verticesPath, edgesPath, edgesInSSSPPath,
+				IncrementalSSSPData.SRC_EDGE_TO_BE_REMOVED, IncrementalSSSPData.TRG_EDGE_TO_BE_REMOVED,
+				IncrementalSSSPData.VAL_EDGE_TO_BE_REMOVED,resultPath, IncrementalSSSPData.NUM_VERTICES + ""});
+		expected = IncrementalSSSPData.RESULTED_VERTICES;
+	}
+
+	@Test
+	public void testIncrementalSSSPNonSPEdge() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Vertex<Long, Double>> vertices = IncrementalSSSPData.getDefaultVertexDataSet(env);
+		DataSet<Edge<Long, Double>> edges = IncrementalSSSPData.getDefaultEdgeDataSet(env);
+		DataSet<Edge<Long, Double>> edgesInSSSP = IncrementalSSSPData.getDefaultEdgesInSSSP(env);
+		// the edge to be removed is a non-SP edge
+		Edge<Long, Double> edgeToBeRemoved = new Edge<Long, Double>(3L, 5L, 5.0);
+
+		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
+		// Assumption: all minimum weight paths are kept
+		Graph<Long, Double, Double> ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env);
+		// remove the edge
+		graph.removeEdge(edgeToBeRemoved);
+
+		// configure the iteration
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+		if(IncrementalSSSP.isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
+
+			parameters.setDirection(EdgeDirection.IN);
+			parameters.setOptDegrees(true);
+
+			// run the vertex centric iteration to propagate info
+			Graph<Long, Double, Double> result = ssspGraph.runVertexCentricIteration(
+					new IncrementalSSSP.VertexDistanceUpdater(),
+					new IncrementalSSSP.InvalidateMessenger(edgeToBeRemoved),
+					IncrementalSSSPData.NUM_VERTICES, parameters);
+
+			DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices();
+
+			resultedVertices.writeAsCsv(resultPath, "\n", ",");
+			env.execute();
+		} else {
+			vertices.writeAsCsv(resultPath, "\n", ",");
+			env.execute();
+		}
+
+		expected = IncrementalSSSPData.VERTICES;
+	}
+
+	@After
+	public void after() throws Exception {
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java
new file mode 100644
index 0000000..294a756
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.JaccardSimilarityMeasure;
+import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class JaccardSimilarityMeasureITCase extends MultipleProgramsTestBase {
+
+	private String edgesPath;
+
+	private String resultPath;
+
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public JaccardSimilarityMeasureITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception {
+		resultPath = tempFolder.newFile().toURI().toString();
+
+		File edgesFile = tempFolder.newFile();
+		Files.write(JaccardSimilarityMeasureData.EDGES, edgesFile, Charsets.UTF_8);
+
+		edgesPath = edgesFile.toURI().toString();
+	}
+
+	@Test
+	public void testJaccardSimilarityMeasureExample() throws Exception {
+		JaccardSimilarityMeasure.main(new String[]{edgesPath, resultPath});
+		expected = JaccardSimilarityMeasureData.JACCARD_EDGES;
+	}
+
+	@After
+	public void after() throws Exception {
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+}