You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:53 UTC

[15/24] flink git commit: [FLINK-2833] [gelly] create a flink-libraries module and move gelly there

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
new file mode 100644
index 0000000..5aa9f26
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+import org.apache.flink.graph.example.MusicProfiles;
+import org.apache.flink.graph.example.utils.MusicProfilesData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+@RunWith(Parameterized.class)
+public class MusicProfilesITCase extends MultipleProgramsTestBase {
+
+	private String tripletsPath;
+
+	private String mismatchesPath;
+
+	private String topSongsResultPath;
+
+	private String communitiesResultPath;
+
+	private String expectedTopSongs;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public MusicProfilesITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception {
+		topSongsResultPath = tempFolder.newFile().toURI().toString();
+		communitiesResultPath = tempFolder.newFile().toURI().toString();
+
+		File tripletsFile = tempFolder.newFile();
+		Files.write(MusicProfilesData.USER_SONG_TRIPLETS, tripletsFile, Charsets.UTF_8);
+		tripletsPath = tripletsFile.toURI().toString();
+
+		File mismatchesFile = tempFolder.newFile();
+		Files.write(MusicProfilesData.MISMATCHES, mismatchesFile, Charsets.UTF_8);
+		mismatchesPath = mismatchesFile.toURI().toString();
+	}
+
+	@Test
+	public void testMusicProfilesExample() throws Exception {
+		MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, "0", communitiesResultPath,
+				MusicProfilesData.MAX_ITERATIONS + ""});
+		expectedTopSongs = MusicProfilesData.TOP_SONGS_RESULT;
+	}
+
+	@After
+	public void after() throws Exception {
+		compareResultsByLinesInMemory(expectedTopSongs, topSongsResultPath);
+
+		ArrayList<String> list = new ArrayList<String>();
+		readAllResultLines(list, communitiesResultPath, new String[]{}, false);
+
+		String[] result = list.toArray(new String[list.size()]);
+		Arrays.sort(result);
+
+		// check that user_1 and user_2 are in the same community
+		Assert.assertEquals("users 1 and 2 are not in the same community",
+				result[0].substring(7), result[1].substring(7));
+
+		// check that user_3, user_4 and user_5 are in the same community
+		Assert.assertEquals("users 3 and 4 are not in the same community",
+				result[2].substring(7), result[3].substring(7));
+		Assert.assertEquals("users 4 and 5 are not in the same community",
+				result[3].substring(7), result[4].substring(7));
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
new file mode 100644
index 0000000..d8f8c8f
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+import org.apache.flink.graph.example.GSASingleSourceShortestPaths;
+import org.apache.flink.graph.example.SingleSourceShortestPaths;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class SingleSourceShortestPathsITCase extends MultipleProgramsTestBase {
+
+    private String edgesPath;
+
+    private String resultPath;
+
+    private String expected;
+
+    @Rule
+    public TemporaryFolder tempFolder = new TemporaryFolder();
+
+    public SingleSourceShortestPathsITCase(TestExecutionMode mode) {
+        super(mode);
+    }
+
+    @Before
+    public void before() throws Exception {
+        resultPath = tempFolder.newFile().toURI().toString();
+
+        File edgesFile = tempFolder.newFile();
+        Files.write(SingleSourceShortestPathsData.EDGES, edgesFile, Charsets.UTF_8);
+        edgesPath = edgesFile.toURI().toString();
+    }
+
+    @Test
+    public void testSSSPExample() throws Exception {
+        SingleSourceShortestPaths.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
+                edgesPath, resultPath, 10 + ""});
+        expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
+    }
+
+    @Test
+    public void testGSASSSPExample() throws Exception {
+        GSASingleSourceShortestPaths.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
+                edgesPath, resultPath, 10 + ""});
+        expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
+    }
+
+    @After
+    public void after() throws Exception {
+        compareResultsByLinesInMemory(expected, resultPath);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
new file mode 100644
index 0000000..421eaa9
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.CommunityDetectionData;
+import org.apache.flink.graph.library.CommunityDetection;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class CommunityDetectionITCase extends MultipleProgramsTestBase {
+
+	public CommunityDetectionITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	private String expected;
+
+	@Test
+	public void testSingleIteration() throws Exception {
+		/*
+		 * Test one iteration of the Simple Community Detection Example
+		 */
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
+				CommunityDetectionData.getSimpleEdgeDataSet(env), new InitLabels(), env);
+
+        List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection<Long>(1, CommunityDetectionData.DELTA))
+        		.getVertices().collect();
+
+		expected = CommunityDetectionData.COMMUNITIES_SINGLE_ITERATION;
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testTieBreaker() throws Exception {
+		/*
+		 * Test one iteration of the Simple Community Detection Example where a tie must be broken
+		 */
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
+				CommunityDetectionData.getTieEdgeDataSet(env), new InitLabels(), env);
+
+        List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection<Long>(1, CommunityDetectionData.DELTA))
+        		.getVertices().collect();
+		expected = CommunityDetectionData.COMMUNITIES_WITH_TIE;
+		compareResultAsTuples(result, expected);
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitLabels implements MapFunction<Long, Long>{
+
+		public Long map(Long id) {
+			return id;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
new file mode 100644
index 0000000..9eb7387
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.library.ConnectedComponents;
+import org.apache.flink.test.testdata.ConnectedComponentsData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.types.NullValue;
+
+import java.io.BufferedReader;
+
+@SuppressWarnings("serial")
+public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTestBase {
+
+	private static final long SEED = 9487520347802987L;
+
+	private static final int NUM_VERTICES = 1000;
+
+	private static final int NUM_EDGES = 10000;
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempFilePath("results");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
+		DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n"));
+
+		DataSet<Edge<Long, NullValue>> edges = edgeString.map(new EdgeParser());
+
+		DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
+
+		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+
+		DataSet<Vertex<Long, Long>> result = graph.run(new ConnectedComponents<Long, NullValue>(100));
+
+		result.writeAsCsv(resultPath, "\n", " ");
+		env.execute();
+	}
+
+	/**
+	 * A map function that takes a Long value and creates a 2-tuple out of it:
+	 * <pre>(Long value) -> (value, value)</pre>
+	 */
+	public static final class IdAssigner implements MapFunction<Long, Vertex<Long, Long>> {
+		@Override
+		public Vertex<Long, Long> map(Long value) {
+			return new Vertex<Long, Long>(value, value);
+		}
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		for (BufferedReader reader : getResultReader(resultPath)) {
+			ConnectedComponentsData.checkOddEvenResult(reader);
+		}
+	}
+
+	public static final class EdgeParser extends RichMapFunction<String, Edge<Long, NullValue>> {
+		public Edge<Long, NullValue> map(String value) {
+			String[] nums = value.split(" ");
+			return new Edge<Long, NullValue>(Long.parseLong(nums[0]), Long.parseLong(nums[1]),
+					NullValue.getInstance());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
new file mode 100644
index 0000000..8785b0d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.LabelPropagationData;
+import org.apache.flink.graph.library.LabelPropagation;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class LabelPropagationITCase extends MultipleProgramsTestBase {
+
+	public LabelPropagationITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+    private String expectedResult;
+
+	@Test
+	public void testSingleIteration() throws Exception {
+		/*
+		 * Test one iteration of label propagation example with a simple graph
+		 */
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+				LabelPropagationData.getDefaultVertexSet(env),
+				LabelPropagationData.getDefaultEdgeDataSet(env), env);
+
+        List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long, NullValue>(1))
+        		.collect();
+
+		expectedResult = LabelPropagationData.LABELS_AFTER_1_ITERATION;
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testTieBreaker() throws Exception {
+		/*
+		 * Test the label propagation example where a tie must be broken
+		 */
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+				LabelPropagationData.getTieVertexSet(env),
+				LabelPropagationData.getTieEdgeDataSet(env), env);
+
+        List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long, NullValue>(1))
+        		.collect();
+
+		expectedResult = LabelPropagationData.LABELS_WITH_TIE;
+		compareResultAsTuples(result, expectedResult);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
new file mode 100644
index 0000000..94c7713
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.PageRankData;
+import org.apache.flink.graph.library.GSAPageRank;
+import org.apache.flink.graph.library.PageRank;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class PageRankITCase extends MultipleProgramsTestBase {
+
+	public PageRankITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+	private String expectedResult;
+
+	@Test
+	public void testPageRankWithThreeIterations() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+        List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3))
+        		.collect();
+        
+        compareWithDelta(result, expectedResult, 0.01);
+	}
+
+	@Test
+	public void testGSAPageRankWithThreeIterations() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+        List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3))
+        		.collect();
+        
+        compareWithDelta(result, expectedResult, 0.01);
+	}
+
+	@Test
+	public void testPageRankWithThreeIterationsAndNumOfVertices() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+        List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 5, 3))
+        		.collect();
+        
+        compareWithDelta(result, expectedResult, 0.01);
+	}
+
+	@Test
+	public void testGSAPageRankWithThreeIterationsAndNumOfVertices() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+        List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 5, 3))
+        		.collect();
+        
+        compareWithDelta(result, expectedResult, 0.01);
+	}
+
+	private void compareWithDelta(List<Vertex<Long, Double>> result,
+			String expectedResult, double delta) {
+
+		String resultString = "";
+        for (Vertex<Long, Double> v : result) {
+        	resultString += v.f0.toString() + "," + v.f1.toString() +"\n";
+        }
+        
+		expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS;
+		String[] expected = expectedResult.isEmpty() ? new String[0] : expectedResult.split("\n");
+
+		String[] resultArray = resultString.isEmpty() ? new String[0] : resultString.split("\n");
+
+		Arrays.sort(expected);
+        Arrays.sort(resultArray);
+
+		for (int i = 0; i < expected.length; i++) {
+			String[] expectedFields = expected[i].split(",");
+			String[] resultFields = resultArray[i].split(",");
+
+			double expectedPayLoad = Double.parseDouble(expectedFields[1]);
+			double resultPayLoad = Double.parseDouble(resultFields[1]);
+
+			Assert.assertTrue("Values differ by more than the permissible delta",
+					Math.abs(expectedPayLoad - resultPayLoad) < delta);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitMapper implements MapFunction<Long, Double> {
+		public Double map(Long value) {
+			return 1.0;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
new file mode 100644
index 0000000..1d9ab9f
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.example.utils.TriangleCountData;
+import org.apache.flink.graph.library.GSATriangleCount;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class TriangleCountITCase extends MultipleProgramsTestBase {
+
+	private String expectedResult;
+
+	public TriangleCountITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testGSATriangleCount() throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
+				env).getUndirected();
+
+		List<Integer> numberOfTriangles = graph.run(new GSATriangleCount<Long, NullValue, NullValue>()).collect();
+		expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES;
+
+		Assert.assertEquals(numberOfTriangles.get(0).intValue(), Integer.parseInt(expectedResult));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
new file mode 100644
index 0000000..b2744f9
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class DegreesITCase extends MultipleProgramsTestBase {
+
+	public DegreesITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+    private String expectedResult;
+
+
+	@Test
+	public void testOutDegrees() throws Exception {
+		/*
+		* Test outDegrees()
+		*/
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        DataSet<Tuple2<Long,Long>> data =graph.outDegrees();
+        List<Tuple2<Long,Long>> result= data.collect();
+       
+        
+        expectedResult = "1,2\n" +
+                    "2,1\n" +
+                    "3,2\n" +
+                    "4,1\n" +
+                    "5,1\n";
+        
+        compareResultAsTuples(result, expectedResult);
+        
+    }
+
+	@Test
+	public void testOutDegreesWithNoOutEdges() throws Exception {
+		/*
+		 * Test outDegrees() no outgoing edges
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
+
+        
+        
+        DataSet<Tuple2<Long,Long>> data =graph.outDegrees();
+        List<Tuple2<Long,Long>> result= data.collect();
+        
+        expectedResult = "1,3\n" +
+                "2,1\n" +
+                "3,1\n" +
+                "4,1\n" +
+                "5,0\n";
+        
+        compareResultAsTuples(result, expectedResult);
+    }
+
+	@Test
+	public void testInDegrees() throws Exception {
+		/*
+		 * Test inDegrees()
+		 */
+	    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+	    Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+	            TestGraphUtils.getLongLongEdgeData(env), env);
+
+
+        DataSet<Tuple2<Long,Long>> data =graph.inDegrees();
+        List<Tuple2<Long,Long>> result= data.collect();
+	    
+	    expectedResult = "1,1\n" +
+		            "2,1\n" +
+		            "3,2\n" +
+		            "4,1\n" +
+		            "5,2\n";
+	    compareResultAsTuples(result, expectedResult);
+    }
+
+	@Test
+	public void testInDegreesWithNoInEdge() throws Exception {
+		/*
+		 * Test inDegrees() no ingoing edge
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
+
+        DataSet<Tuple2<Long,Long>> data =graph.inDegrees();
+        List<Tuple2<Long,Long>> result= data.collect();
+        
+        expectedResult = "1,0\n" +
+	                "2,1\n" +
+	                "3,1\n" +
+	                "4,1\n" +
+	                "5,3\n";
+        
+        compareResultAsTuples(result, expectedResult);
+    }
+
+	@Test
+	public void testGetDegrees() throws Exception {
+		/*
+		 * Test getDegrees()
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        DataSet<Tuple2<Long,Long>> data =graph.getDegrees();
+        List<Tuple2<Long,Long>> result= data.collect();
+        
+        expectedResult = "1,3\n" +
+	                "2,2\n" +
+	                "3,4\n" +
+	                "4,2\n" +
+	                "5,3\n";
+        
+        compareResultAsTuples(result, expectedResult);
+    }
+
+	@Test
+	public void testGetDegreesWithDisconnectedData() throws Exception {
+        /*
+		 * Test getDegrees() with disconnected data
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, NullValue, Long> graph =
+                Graph.fromDataSet(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
+
+        DataSet<Tuple2<Long,Long>> data =graph.outDegrees();
+        List<Tuple2<Long,Long>> result= data.collect();
+        
+        expectedResult = "1,2\n" +
+                "2,1\n" +
+                "3,0\n" +
+                "4,1\n" +
+                "5,0\n";
+        
+        compareResultAsTuples(result, expectedResult);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
new file mode 100644
index 0000000..955122f
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class DegreesWithExceptionITCase {
+
+	private static final int PARALLELISM = 4;
+
+	private static ForkableFlinkMiniCluster cluster;
+	
+
+	@BeforeClass
+	public static void setupCluster() {
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
+			cluster = new ForkableFlinkMiniCluster(config, false);
+			cluster.start();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Error starting test cluster: " + e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void tearDownCluster() {
+		try {
+			cluster.stop();
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			fail("Cluster shutdown caused an exception: " + t.getMessage());
+		}
+	}
+
+	/**
+	 * Test outDegrees() with an edge having a srcId that does not exist in the vertex DataSet
+	 */
+	@Test
+	public void testOutDegreesInvalidEdgeSrcId() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+				"localhost", cluster.getLeaderRPCPort());
+		env.setParallelism(PARALLELISM);
+		env.getConfig().disableSysoutLogging();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
+
+		try {
+			graph.outDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+			env.execute();
+
+			fail("graph.outDegrees() did not fail.");
+		} catch (Exception e) {
+			// We expect the job to fail with an exception
+		}
+	}
+
+	/**
+	 * Test inDegrees() with an edge having a trgId that does not exist in the vertex DataSet
+	 */
+	@Test
+	public void testInDegreesInvalidEdgeTrgId() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+				"localhost", cluster.getLeaderRPCPort());
+		env.setParallelism(PARALLELISM);
+		env.getConfig().disableSysoutLogging();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
+
+		try {
+			graph.inDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+			env.execute();
+
+			fail("graph.inDegrees() did not fail.");
+		} catch (Exception e) {
+			// We expect the job to fail with an exception
+		}
+	}
+
+	/**
+	 * Test getDegrees() with an edge having a trgId that does not exist in the vertex DataSet
+	 */
+	@Test
+	public void testGetDegreesInvalidEdgeTrgId() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+				"localhost", cluster.getLeaderRPCPort());
+		env.setParallelism(PARALLELISM);
+		env.getConfig().disableSysoutLogging();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
+
+		try {
+			graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+			env.execute();
+
+			fail("graph.getDegrees() did not fail.");
+		} catch (Exception e) {
+			// We expect the job to fail with an exception
+		}
+	}
+
+	/**
+	 * Test getDegrees() with an edge having a srcId that does not exist in the vertex DataSet
+	 */
+	@Test
+	public void testGetDegreesInvalidEdgeSrcId() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+				"localhost", cluster.getLeaderRPCPort());
+		env.setParallelism(PARALLELISM);
+		env.getConfig().disableSysoutLogging();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
+
+		try {
+			graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+			env.execute();
+
+			fail("graph.getDegrees() did not fail.");
+		} catch (Exception e) {
+			// We expect the job to fail with an exception
+		}
+	}
+
+	/**
+	 * Test getDegrees() with an edge having a srcId and a trgId that does not exist in the vertex DataSet
+	 */
+	@Test
+	public void testGetDegreesInvalidEdgeSrcTrgId() throws Exception {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+				"localhost", cluster.getLeaderRPCPort());
+		env.setParallelism(PARALLELISM);
+		env.getConfig().disableSysoutLogging();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeInvalidSrcTrgData(env), env);
+
+		try {
+			graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+			env.execute();
+
+			fail("graph.getDegrees() did not fail.");
+		}
+		catch (Exception e) {
+			// We expect the job to fail with an exception
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java
new file mode 100644
index 0000000..5a64dd7
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import java.util.List;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class FromCollectionITCase extends MultipleProgramsTestBase {
+
+	public FromCollectionITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+    private String expectedResult;
+
+
+	@Test
+	public void testFromCollectionVerticesEdges() throws Exception {
+		/*
+		 * Test fromCollection(vertices, edges):
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+                TestGraphUtils.getLongLongEdges(), env);
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
+        
+        expectedResult = "1,2,12\n" +
+	                "1,3,13\n" +
+	                "2,3,23\n" +
+	                "3,4,34\n" +
+	                "3,5,35\n" +
+	                "4,5,45\n" +
+	                "5,1,51\n";
+        
+        compareResultAsTuples(result, expectedResult);
+    }
+
+	@Test
+	public void testFromCollectionEdgesNoInitialValue() throws Exception {
+        /*
+         * Test fromCollection(edges) with no initial value for the vertices
+         */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        Graph<Long, NullValue, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(),
+        		env);
+
+        
+        DataSet<Vertex<Long,NullValue>> data = graph.getVertices();
+        List<Vertex<Long,NullValue>> result= data.collect();
+        
+        expectedResult = "1,(null)\n" +
+	                "2,(null)\n" +
+	                "3,(null)\n" +
+	                "4,(null)\n" +
+	                "5,(null)\n";
+        
+        compareResultAsTuples(result, expectedResult);
+    }
+
+	@Test
+	public void testFromCollectionEdgesWithInitialValue() throws Exception {
+        /*
+         * Test fromCollection(edges) with vertices initialised by a
+         * function that takes the id and doubles it
+         */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(),
+                new InitVerticesMapper(), env);
+
+        DataSet<Vertex<Long,Long>> data = graph.getVertices();
+        List<Vertex<Long,Long>> result= data.collect();
+        
+        expectedResult = "1,2\n" +
+	                "2,4\n" +
+	                "3,6\n" +
+	                "4,8\n" +
+	                "5,10\n";
+        
+        compareResultAsTuples(result, expectedResult);
+    }
+
+	@SuppressWarnings("serial")
+	private static final class InitVerticesMapper implements MapFunction<Long, Long> {
+        public Long map(Long vertexId) {
+            return vertexId * 2;
+        }
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
new file mode 100644
index 0000000..22a5151
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.validation.InvalidVertexIdsValidator;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphCreationITCase extends MultipleProgramsTestBase {
+
+	public GraphCreationITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+
+    private String expectedResult;
+
+	@Test
+	public void testCreateWithoutVertexValues() throws Exception {
+	/*
+	 * Test create() with edge dataset and no vertex values
+     */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, NullValue, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env);
+
+        DataSet<Vertex<Long,NullValue>> data = graph.getVertices();
+        List<Vertex<Long,NullValue>> result= data.collect();
+        
+		expectedResult = "1,(null)\n" +
+					"2,(null)\n" +
+					"3,(null)\n" +
+					"4,(null)\n" +
+					"5,(null)\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testCreateWithMapper() throws Exception {
+	/*
+	 * Test create() with edge dataset and a mapper that assigns the id as value
+     */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
+				new AssignIdAsValueMapper(), env);
+
+        DataSet<Vertex<Long,Long>> data = graph.getVertices();
+        List<Vertex<Long,Long>> result= data.collect();
+        
+		expectedResult = "1,1\n" +
+					"2,2\n" +
+					"3,3\n" +
+					"4,4\n" +
+					"5,5\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testCreateWithCustomVertexValue() throws Exception {
+		/*
+		 * Test create() with edge dataset and a mapper that assigns a parametrized custom vertex value
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, DummyCustomParameterizedType<Double>, Long> graph = Graph.fromDataSet(
+				TestGraphUtils.getLongLongEdgeData(env), new AssignCustomVertexValueMapper(), env);
+
+        DataSet<Vertex<Long,DummyCustomParameterizedType<Double>>> data = graph.getVertices();
+        List<Vertex<Long,DummyCustomParameterizedType<Double>>> result= data.collect();
+        
+		expectedResult = "1,(2.0,0)\n" +
+				"2,(4.0,1)\n" +
+				"3,(6.0,2)\n" +
+				"4,(8.0,3)\n" +
+				"5,(10.0,4)\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testValidate() throws Exception {
+		/*
+		 * Test validate():
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongVertexData(env);
+		DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
+		Boolean valid = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
+
+		//env.fromElements(result).writeAsText(resultPath);
+		
+		String res= valid.toString();//env.fromElements(valid);
+        List<String> result= new LinkedList<String>();
+        result.add(res);
+		expectedResult = "true";
+		
+		compareResultAsText(result, expectedResult);
+	}
+
+	@Test
+	public void testValidateWithInvalidIds() throws Exception {
+		/*
+		 * Test validate() - invalid vertex ids
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongInvalidVertexData(env);
+		DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
+		Boolean valid = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
+		
+		String res= valid.toString();//env.fromElements(valid);
+        List<String> result= new LinkedList<String>();
+        result.add(res);
+
+		expectedResult = "false\n";
+		
+		compareResultAsText(result, expectedResult);
+	}
+
+	@SuppressWarnings("serial")
+	private static final class AssignIdAsValueMapper implements MapFunction<Long, Long> {
+		public Long map(Long vertexId) {
+			return vertexId;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class AssignCustomVertexValueMapper implements
+		MapFunction<Long, DummyCustomParameterizedType<Double>> {
+
+		DummyCustomParameterizedType<Double> dummyValue =
+				new DummyCustomParameterizedType<Double>();
+
+		public DummyCustomParameterizedType<Double> map(Long vertexId) {
+			dummyValue.setIntField(vertexId.intValue()-1);
+			dummyValue.setTField(vertexId*2.0);
+			return dummyValue;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
new file mode 100644
index 0000000..99c66ec
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import com.google.common.base.Charsets;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class GraphCreationWithCsvITCase extends MultipleProgramsTestBase {
+
+	public GraphCreationWithCsvITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	private String expectedResult;
+
+	@Test
+	public void testCreateWithCsvFile() throws Exception {
+		/*
+		 * Test with two Csv files one with Vertex Data and one with Edges data
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		final String fileContent =  "1,1\n"+
+				"2,2\n"+
+				"3,3\n";
+		final FileInputSplit split = createTempFile(fileContent);
+		final String fileContent2 =  "1,2,ot\n"+
+				"3,2,tt\n"+
+				"3,1,to\n";
+		final FileInputSplit split2 = createTempFile(fileContent2);
+
+		Graph<Long, Long, String> graph = Graph.fromCsvReader(split.getPath().toString(),split2.getPath().toString(),env)
+				.types(Long.class, Long.class, String.class);
+
+		List<Triplet<Long, Long, String>> result = graph.getTriplets().collect();
+
+		expectedResult = "1,2,1,2,ot\n" +
+				"3,2,3,2,tt\n" +
+				"3,1,3,1,to\n";
+
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testCsvWithNullEdge() throws Exception {
+		/*
+		Test fromCsvReader with edge and vertex path and nullvalue for edge
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		final String vertexFileContent = "1,one\n"+
+				"2,two\n"+
+				"3,three\n";
+		final String edgeFileContent = "1,2\n"+
+				"3,2\n"+
+				"3,1\n";
+		final FileInputSplit split = createTempFile(vertexFileContent);
+		final FileInputSplit edgeSplit = createTempFile(edgeFileContent);
+
+		Graph<Long, String, NullValue> graph = Graph.fromCsvReader(split.getPath().toString(), edgeSplit.getPath().toString(),
+				env).vertexTypes(Long.class, String.class);
+
+		List<Triplet<Long, String, NullValue>> result = graph.getTriplets().collect();
+
+		expectedResult = "1,2,one,two,(null)\n"+
+				"3,2,three,two,(null)\n"+
+				"3,1,three,one,(null)\n";
+
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testCsvWithConstantValueMapper() throws Exception {
+		/*
+		*Test fromCsvReader with edge path and a mapper that assigns a Double constant as value
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		final String fileContent =  "1,2,ot\n"+
+				"3,2,tt\n"+
+				"3,1,to\n";
+		final FileInputSplit split = createTempFile(fileContent);
+
+		Graph<Long, Double, String> graph = Graph.fromCsvReader(split.getPath().toString(),
+				new AssignDoubleValueMapper(), env).types(Long.class, Double.class, String.class);
+
+		List<Triplet<Long, Double, String>> result = graph.getTriplets().collect();
+		//graph.getTriplets().writeAsCsv(resultPath);
+		expectedResult = "1,2,0.1,0.1,ot\n" + "3,1,0.1,0.1,to\n" + "3,2,0.1,0.1,tt\n";
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testCreateWithOnlyEdgesCsvFile() throws Exception {
+		/*
+		 * Test with one Csv file one with Edges data. Also tests the configuration method ignoreFistLineEdges()
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		final String fileContent2 =  "header\n1,2,ot\n"+
+				"3,2,tt\n"+
+				"3,1,to\n";
+
+		final FileInputSplit split2 = createTempFile(fileContent2);
+		Graph<Long, NullValue, String> graph= Graph.fromCsvReader(split2.getPath().toString(), env)
+				.ignoreFirstLineEdges()
+				.ignoreCommentsVertices("hi")
+				.edgeTypes(Long.class, String.class);
+
+		List<Triplet<Long, NullValue, String>> result = graph.getTriplets().collect();
+		expectedResult = "1,2,(null),(null),ot\n" +
+				"3,2,(null),(null),tt\n" +
+				"3,1,(null),(null),to\n";
+
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testCreateCsvFileDelimiterConfiguration() throws Exception {
+		/*
+		 * Test with an Edge and Vertex csv file. Tests the configuration methods FieldDelimiterEdges and
+		 * FieldDelimiterVertices
+		 * Also tests the configuration methods LineDelimiterEdges and LineDelimiterVertices
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		final String fileContent =  "header\n1;1\n"+
+				"2;2\n"+
+				"3;3\n";
+
+		final FileInputSplit split = createTempFile(fileContent);
+
+		final String fileContent2 =  "header|1:2:ot|"+
+				"3:2:tt|"+
+				"3:1:to|";
+
+		final FileInputSplit split2 = createTempFile(fileContent2);
+
+		Graph<Long, Long, String> graph= Graph.fromCsvReader(split.getPath().toString(),split2.getPath().toString(),env).
+				ignoreFirstLineEdges().ignoreFirstLineVertices().
+				fieldDelimiterEdges(":").fieldDelimiterVertices(";").
+				lineDelimiterEdges("|").
+				types(Long.class, Long.class, String.class);
+
+		List<Triplet<Long, Long, String>> result = graph.getTriplets().collect();
+
+		expectedResult = "1,2,1,2,ot\n" +
+				"3,2,3,2,tt\n" +
+				"3,1,3,1,to\n";
+
+		compareResultAsTuples(result, expectedResult);
+
+	}
+
+	/*----------------------------------------------------------------------------------------------------------------*/
+	@SuppressWarnings("serial")
+	private static final class AssignDoubleValueMapper implements MapFunction<Long, Double> {
+		public Double map(Long value) {
+			return 0.1d;
+		}
+	}
+
+	private FileInputSplit createTempFile(String content) throws IOException {
+		File tempFile = File.createTempFile("test_contents", "tmp");
+		tempFile.deleteOnExit();
+
+		OutputStreamWriter wrt = new OutputStreamWriter(
+				new FileOutputStream(tempFile), Charsets.UTF_8
+		);
+		wrt.write(content);
+		wrt.close();
+
+		return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0,
+							tempFile.length(), new String[] {"localhost"});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java
new file mode 100644
index 0000000..20cbca5
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphCreationWithMapperITCase extends MultipleProgramsTestBase {
+
+	public GraphCreationWithMapperITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+    private String expectedResult;
+
+
+	@Test
+	public void testWithDoubleValueMapper() throws Exception {
+		/*
+		 * Test create() with edge dataset and a mapper that assigns a double constant as value
+	     */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
+				new AssignDoubleValueMapper(), env);
+
+        DataSet<Vertex<Long,Double>> data = graph.getVertices();
+        List<Vertex<Long,Double>> result= data.collect();
+		
+		expectedResult = "1,0.1\n" +
+				"2,0.1\n" +
+				"3,0.1\n" +
+				"4,0.1\n" +
+				"5,0.1\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testWithTuple2ValueMapper() throws Exception {
+		/*
+		 * Test create() with edge dataset and a mapper that assigns a Tuple2 as value
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Tuple2<Long, Long>, Long> graph = Graph.fromDataSet(
+				TestGraphUtils.getLongLongEdgeData(env), new AssignTuple2ValueMapper(), env);
+
+        DataSet<Vertex<Long, Tuple2<Long, Long>>> data = graph.getVertices();
+        List<Vertex<Long, Tuple2<Long, Long>>> result= data.collect();
+        
+		expectedResult = "1,(2,42)\n" +
+				"2,(4,42)\n" +
+				"3,(6,42)\n" +
+				"4,(8,42)\n" +
+				"5,(10,42)\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testWithConstantValueMapper() throws Exception {
+	/*
+	 * Test create() with edge dataset with String key type
+	 * and a mapper that assigns a double constant as value
+	 */
+	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+	Graph<String, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getStringLongEdgeData(env),
+			new AssignDoubleConstantMapper(), env);
+
+    DataSet<Vertex<String,Double>> data = graph.getVertices();
+    List<Vertex<String,Double>> result= data.collect();
+    
+	expectedResult = "1,0.1\n" +
+			"2,0.1\n" +
+			"3,0.1\n" +
+			"4,0.1\n" +
+			"5,0.1\n";
+	
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testWithDCustomValueMapper() throws Exception {
+		/*
+		 * Test create() with edge dataset and a mapper that assigns a custom vertex value
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, DummyCustomType, Long> graph = Graph.fromDataSet(
+				TestGraphUtils.getLongLongEdgeData(env), new AssignCustomValueMapper(), env);
+
+	    DataSet<Vertex<Long,DummyCustomType>> data = graph.getVertices();
+	    List<Vertex<Long,DummyCustomType>> result= data.collect();
+	    
+		expectedResult = "1,(F,0)\n" +
+				"2,(F,1)\n" +
+				"3,(F,2)\n" +
+				"4,(F,3)\n" +
+				"5,(F,4)\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@SuppressWarnings("serial")
+	private static final class AssignDoubleValueMapper implements MapFunction<Long, Double> {
+		public Double map(Long value) {
+			return 0.1d;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class AssignTuple2ValueMapper implements MapFunction<Long, Tuple2<Long, Long>> {
+		public Tuple2<Long, Long> map(Long vertexId) {
+			return new Tuple2<Long, Long>(vertexId*2, 42l);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class AssignDoubleConstantMapper implements MapFunction<String, Double> {
+		public Double map(String value) {
+			return 0.1d;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class AssignCustomValueMapper implements MapFunction<Long, DummyCustomType> {
+		public DummyCustomType map(Long vertexId) {
+			return new DummyCustomType(vertexId.intValue()-1, false);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java
new file mode 100644
index 0000000..d6e5a9c
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphMutationsITCase extends MultipleProgramsTestBase {
+
+	public GraphMutationsITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+    private String expectedResult;
+
+
+	@Test
+	public void testAddVertex() throws Exception {
+		/*
+		 * Test addVertex() -- simple case
+		 */	
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L));
+        
+		DataSet<Vertex<Long,Long>> data = graph.getVertices();
+        List<Vertex<Long,Long>> result= data.collect();
+
+		expectedResult = "1,1\n" +
+				"2,2\n" +
+				"3,3\n" +
+				"4,4\n" +
+				"5,5\n" +
+				"6,6\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testAddVertices() throws Exception {
+		/*
+		 * Test addVertices() -- simple case
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+		vertices.add(new Vertex<Long, Long>(6L, 6L));
+		vertices.add(new Vertex<Long, Long>(7L, 7L));
+
+		graph = graph.addVertices(vertices);
+
+		DataSet<Vertex<Long,Long>> data = graph.getVertices();
+        List<Vertex<Long,Long>> result= data.collect();
+
+		expectedResult = "1,1\n" +
+				"2,2\n" +
+				"3,3\n" +
+				"4,4\n" +
+				"5,5\n" +
+				"6,6\n" +
+				"7,7\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testAddVertexExisting() throws Exception {
+		/*
+		 * Test addVertex() -- add an existing vertex
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		graph = graph.addVertex(new Vertex<Long, Long>(1L, 1L));
+		
+		DataSet<Vertex<Long,Long>> data = graph.getVertices();
+        List<Vertex<Long,Long>> result= data.collect();
+
+		expectedResult = "1,1\n" +
+				"2,2\n" +
+				"3,3\n" +
+				"4,4\n" +
+				"5,5\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testAddVerticesBothExisting() throws Exception {
+		/*
+		 * Test addVertices() -- add two existing vertices
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+		vertices.add(new Vertex<Long, Long>(1L, 1L));
+		vertices.add(new Vertex<Long, Long>(3L, 3L));
+
+		graph = graph.addVertices(vertices);
+
+		DataSet<Vertex<Long,Long>> data = graph.getVertices();
+        List<Vertex<Long,Long>> result= data.collect();
+
+		expectedResult = "1,1\n" +
+				"2,2\n" +
+				"3,3\n" +
+				"4,4\n" +
+				"5,5\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testAddVerticesOneExisting() throws Exception {
+		/*
+		 * Test addVertices() -- add an existing vertex
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+		vertices.add(new Vertex<Long, Long>(1L, 1L));
+		vertices.add(new Vertex<Long, Long>(6L, 6L));
+
+		graph = graph.addVertices(vertices);
+
+		DataSet<Vertex<Long,Long>> data = graph.getVertices();
+        List<Vertex<Long,Long>> result= data.collect();
+
+		expectedResult = "1,1\n" +
+				"2,2\n" +
+				"3,3\n" +
+				"4,4\n" +
+				"5,5\n" +
+				"6,6\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testRemoveVertex() throws Exception {
+		/*
+		 * Test removeVertex() -- simple case
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph = graph.removeVertex(new Vertex<Long, Long>(5L, 5L));
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
+
+		expectedResult = "1,2,12\n" +
+				"1,3,13\n" +
+				"2,3,23\n" +
+				"3,4,34\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testRemoveVertices() throws Exception {
+		/*
+		 * Test removeVertices() -- simple case
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<Vertex<Long, Long>>();
+		verticesToBeRemoved.add(new Vertex<Long, Long>(1L, 1L));
+		verticesToBeRemoved.add(new Vertex<Long, Long>(2L, 2L));
+
+		graph = graph.removeVertices(verticesToBeRemoved);
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
+
+		expectedResult = "3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testRemoveInvalidVertex() throws Exception {
+		/*
+		 * Test removeVertex() -- remove an invalid vertex
+		 */	
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph = graph.removeVertex(new Vertex<Long, Long>(6L, 6L));
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
+
+		expectedResult = "1,2,12\n" +
+				"1,3,13\n" +
+				"2,3,23\n" +
+				"3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n" +
+				"5,1,51\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testRemoveOneValidOneInvalidVertex() throws Exception {
+		/*
+		 * Test removeVertices() -- remove one invalid vertex and a valid one
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<Vertex<Long, Long>>();
+		verticesToBeRemoved.add(new Vertex<Long, Long>(1L, 1L));
+		verticesToBeRemoved.add(new Vertex<Long, Long>(7L, 7L));
+
+		graph = graph.removeVertices(verticesToBeRemoved);
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
+
+		expectedResult = "2,3,23\n" +
+				"3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testRemoveBothInvalidVertices() throws Exception {
+		/*
+		 * Test removeVertices() -- remove two invalid vertices
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<Vertex<Long, Long>>();
+		verticesToBeRemoved.add(new Vertex<Long, Long>(6L, 6L));
+		verticesToBeRemoved.add(new Vertex<Long, Long>(7L, 7L));
+
+		graph = graph.removeVertices(verticesToBeRemoved);
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
+
+		expectedResult = "1,2,12\n" +
+				"1,3,13\n" +
+				"2,3,23\n" +
+				"3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n" +
+				"5,1,51\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testRemoveBothInvalidVerticesVertexResult() throws Exception {
+		/*
+		 * Test removeVertices() -- remove two invalid vertices and verify the data set of vertices
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<Vertex<Long, Long>>();
+		verticesToBeRemoved.add(new Vertex<Long, Long>(6L, 6L));
+		verticesToBeRemoved.add(new Vertex<Long, Long>(7L, 7L));
+
+		graph = graph.removeVertices(verticesToBeRemoved);
+
+        DataSet<Vertex<Long,Long>> data = graph.getVertices();
+        List<Vertex<Long, Long>> result= data.collect();
+
+		expectedResult = "1,1\n" +
+				"2,2\n" +
+				"3,3\n" +
+				"4,4\n" +
+				"5,5\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+	
+	@Test
+	public void testAddEdge() throws Exception {
+		/*
+		 * Test addEdge() -- simple case
+		 */
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph = graph.addEdge(new Vertex<Long, Long>(6L, 6L), new Vertex<Long, Long>(1L, 1L),
+				61L);
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
+
+		expectedResult = "1,2,12\n" +
+				"1,3,13\n" +
+				"2,3,23\n" +
+				"3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n" +
+				"5,1,51\n" +
+				"6,1,61\n";	
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testAddEdges() throws Exception {
+		/*
+		 * Test addEdges() -- simple case
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		List<Edge<Long, Long>> edgesToBeAdded = new ArrayList<Edge<Long, Long>>();
+		edgesToBeAdded.add(new Edge<Long, Long>(2L, 4L, 24L));
+		edgesToBeAdded.add(new Edge<Long, Long>(4L, 1L, 41L));
+
+		graph = graph.addEdges(edgesToBeAdded);
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
+
+		expectedResult = "1,2,12\n" +
+				"1,3,13\n" +
+				"2,3,23\n" +
+				"2,4,24\n" +
+				"3,4,34\n" +
+				"3,5,35\n" +
+				"4,1,41\n" +
+				"4,5,45\n" +
+				"5,1,51\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testAddEdgesInvalidVertices() throws Exception {
+		/*
+		 * Test addEdges() -- the source and target vertices do not exist in the graph
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		List<Edge<Long, Long>> edgesToBeAdded = new ArrayList<Edge<Long, Long>>();
+		edgesToBeAdded.add(new Edge<Long, Long>(6L, 1L, 61L));
+		edgesToBeAdded.add(new Edge<Long, Long>(7L, 1L, 71L));
+
+		graph = graph.addEdges(edgesToBeAdded);
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
+
+		expectedResult = "1,2,12\n" +
+				"1,3,13\n" +
+				"2,3,23\n" +
+				"3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n" +
+				"5,1,51\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testAddExistingEdge() throws Exception {
+		/*
+		 * Test addEdge() -- add already existing edge
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph = graph.addEdge(new Vertex<Long, Long>(1L, 1L), new Vertex<Long, Long>(2L, 2L),
+				12L);
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
+
+		expectedResult = "1,2,12\n" +
+				"1,2,12\n" +
+				"1,3,13\n" +
+				"2,3,23\n" +
+				"3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n" +
+				"5,1,51\n";	
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testRemoveEdge() throws Exception {
+		/*
+		 * Test removeEdge() -- simple case
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph = graph.removeEdge(new Edge<Long, Long>(5L, 1L, 51L));
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
+
+		expectedResult = "1,2,12\n" +
+				"1,3,13\n" +
+				"2,3,23\n" +
+				"3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testRemoveEdges() throws Exception {
+		/*
+		 * Test removeEdges() -- simple case
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		List<Edge<Long, Long>> edgesToBeRemoved = new ArrayList<Edge<Long, Long>>();
+		edgesToBeRemoved.add(new Edge<Long, Long>(5L, 1L, 51L));
+		edgesToBeRemoved.add(new Edge<Long, Long>(2L, 3L, 23L));
+
+		graph = graph.removeEdges(edgesToBeRemoved);
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
+
+		expectedResult = "1,2,12\n" +
+				"1,3,13\n" +
+				"3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testRemoveSameEdgeTwice() throws Exception {
+		/*
+		 * Test removeEdges() -- try to remove the same edge twice
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		List<Edge<Long, Long>> edgesToBeRemoved = new ArrayList<Edge<Long, Long>>();
+		edgesToBeRemoved.add(new Edge<Long, Long>(5L, 1L, 51L));
+		edgesToBeRemoved.add(new Edge<Long, Long>(5L, 1L, 51L));
+
+		graph = graph.removeEdges(edgesToBeRemoved);
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
+
+		expectedResult = "1,2,12\n" +
+				"1,3,13\n" +
+				"2,3,23\n" +
+				"3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testRemoveInvalidEdge() throws Exception {
+		/*
+		 * Test removeEdge() -- invalid edge
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		graph = graph.removeEdge(new Edge<Long, Long>(6L, 1L, 61L));
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
+
+		expectedResult = "1,2,12\n" +
+				"1,3,13\n" +
+				"2,3,23\n" +
+				"3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n" +
+				"5,1,51\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testRemoveOneValidOneInvalidEdge() throws Exception {
+		/*
+		 * Test removeEdges() -- one edge is valid, the other is invalid
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		List<Edge<Long, Long>> edgesToBeRemoved = new ArrayList<Edge<Long, Long>>();
+		edgesToBeRemoved.add(new Edge<Long, Long>(1L, 1L, 51L));
+		edgesToBeRemoved.add(new Edge<Long, Long>(6L, 1L, 61L));
+
+		graph = graph.removeEdges(edgesToBeRemoved);
+
+        DataSet<Edge<Long,Long>> data = graph.getEdges();
+        List<Edge<Long, Long>> result= data.collect();
+
+		expectedResult = "1,2,12\n" +
+				"1,3,13\n" +
+				"2,3,23\n" +
+				"3,4,34\n" +
+				"3,5,35\n" +
+				"4,5,45\n" +
+				"5,1,51\n";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+}
\ No newline at end of file