You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:57 UTC

[19/24] flink git commit: [FLINK-2833] [gelly] create a flink-libraries module and move gelly there

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
new file mode 100644
index 0000000..e347bc5
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.EdgesFunctionWithVertexValue;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.MusicProfilesData;
+import org.apache.flink.graph.library.LabelPropagation;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example demonstrates how to mix the DataSet Flink API with the Gelly API.
+ * The input is a set <userId - songId - playCount> triplets and
+ * a set of bad records, i.e. song ids that should not be trusted.
+ * Initially, we use the DataSet API to filter out the bad records.
+ * Then, we use Gelly to create a user -> song weighted bipartite graph and compute
+ * the top song (most listened) per user.
+ * Then, we use the DataSet API again, to create a user-user similarity graph,
+ * based on common songs, where users that are listeners of the same song
+ * are connected. A user-defined threshold on the playcount value
+ * defines when a user is considered to be a listener of a song.
+ * Finally, we use the graph API to run the label propagation community detection algorithm on
+ * the similarity graph.
+ *
+ * The triplets input is expected to be given as one triplet per line,
+ * in the following format: "<userID>\t<songID>\t<playcount>".
+ *
+ * The mismatches input file is expected to contain one mismatch record per line,
+ * in the following format:
+ * "ERROR: <songID trackID> song_title"
+ *
+ * If no arguments are provided, the example runs with default data from {@link MusicProfilesData}.
+ */
+@SuppressWarnings("serial")
+public class MusicProfiles implements ProgramDescription {
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		/**
+		 * Read the user-song-play triplets.
+		 */
+		DataSet<Tuple3<String, String, Integer>> triplets = getUserSongTripletsData(env);
+
+		/**
+		 * Read the mismatches dataset and extract the songIDs
+		 */
+		DataSet<Tuple1<String>> mismatches = getMismatchesData(env).map(new ExtractMismatchSongIds());
+
+		/**
+		 * Filter out the mismatches from the triplets dataset
+		 */
+		DataSet<Tuple3<String, String, Integer>> validTriplets = triplets
+				.coGroup(mismatches).where(1).equalTo(0)
+				.with(new FilterOutMismatches());
+
+		/**
+		 * Create a user -> song weighted bipartite graph where the edge weights
+		 * correspond to play counts
+		 */
+		Graph<String, NullValue, Integer> userSongGraph = Graph.fromTupleDataSet(validTriplets, env);
+
+		/**
+		 * Get the top track (most listened) for each user
+		 */
+		DataSet<Tuple2<String, String>> usersWithTopTrack = userSongGraph
+				.groupReduceOnEdges(new GetTopSongPerUser(), EdgeDirection.OUT)
+				.filter(new FilterSongNodes());
+
+		if (fileOutput) {
+			usersWithTopTrack.writeAsCsv(topTracksOutputPath, "\n", "\t");
+		} else {
+			usersWithTopTrack.print();
+		}
+
+		/**
+		 * Create a user-user similarity graph, based on common songs, i.e. two
+		 * users that listen to the same song are connected. For each song, we
+		 * create an edge between each pair of its in-neighbors.
+		 */
+		DataSet<Edge<String, NullValue>> similarUsers = userSongGraph
+				.getEdges()
+				// filter out user-song edges that are below the playcount threshold
+				.filter(new FilterFunction<Edge<String, Integer>>() {
+					public boolean filter(Edge<String, Integer> edge) {
+						return (edge.getValue() > playcountThreshold);
+					}
+				}).groupBy(1)
+				.reduceGroup(new CreateSimilarUserEdges()).distinct();
+
+		Graph<String, Long, NullValue> similarUsersGraph = Graph.fromDataSet(similarUsers,
+				new MapFunction<String, Long>() {
+					public Long map(String value) {
+						return 1l;
+					}
+				}, env).getUndirected();
+
+		/**
+		 * Detect user communities using the label propagation library method
+		 */
+		// Initialize each vertex with a unique numeric label and run the label propagation algorithm
+		DataSet<Tuple2<String, Long>> idsWithInitialLabels = DataSetUtils
+				.zipWithUniqueId(similarUsersGraph.getVertexIds())
+				.map(new MapFunction<Tuple2<Long, String>, Tuple2<String, Long>>() {
+					@Override
+					public Tuple2<String, Long> map(Tuple2<Long, String> tuple2) throws Exception {
+						return new Tuple2<String, Long>(tuple2.f1, tuple2.f0);
+					}
+				});
+
+		DataSet<Vertex<String, Long>> verticesWithCommunity = similarUsersGraph
+				.joinWithVertices(idsWithInitialLabels,
+						new MapFunction<Tuple2<Long, Long>, Long>() {
+							public Long map(Tuple2<Long, Long> value) {
+								return value.f1;
+							}
+						}).run(new LabelPropagation<String, NullValue>(maxIterations));
+
+		if (fileOutput) {
+			verticesWithCommunity.writeAsCsv(communitiesOutputPath, "\n", "\t");
+
+			// since file sinks are lazy, we trigger the execution explicitly
+			env.execute();
+		} else {
+			verticesWithCommunity.print();
+		}
+
+	}
+
+	public static final class ExtractMismatchSongIds implements MapFunction<String, Tuple1<String>> {
+
+		public Tuple1<String> map(String value) {
+			String[] tokens = value.split("\\s+");
+			String songId = tokens[1].substring(1);
+			return new Tuple1<String>(songId);
+		}
+	}
+
+	public static final class FilterOutMismatches implements CoGroupFunction<Tuple3<String, String, Integer>,
+		Tuple1<String>, Tuple3<String, String, Integer>> {
+
+		public void coGroup(Iterable<Tuple3<String, String, Integer>> triplets,
+				Iterable<Tuple1<String>> invalidSongs, Collector<Tuple3<String, String, Integer>> out) {
+
+			if (!invalidSongs.iterator().hasNext()) {
+				// this is a valid triplet
+				for (Tuple3<String, String, Integer> triplet : triplets) {
+					out.collect(triplet);
+				}
+			}
+		}
+	}
+
+	public static final class FilterSongNodes implements FilterFunction<Tuple2<String, String>> {
+		public boolean filter(Tuple2<String, String> value) throws Exception {
+			return !value.f1.equals("");
+		}
+	}
+
+	public static final class GetTopSongPerUser	implements EdgesFunctionWithVertexValue<String, NullValue, Integer,
+		Tuple2<String, String>> {
+
+		public void iterateEdges(Vertex<String, NullValue> vertex,
+				Iterable<Edge<String, Integer>> edges, Collector<Tuple2<String, String>> out) throws Exception {
+
+			int maxPlaycount = 0;
+			String topSong = "";
+			for (Edge<String, Integer> edge : edges) {
+				if (edge.getValue() > maxPlaycount) {
+					maxPlaycount = edge.getValue();
+					topSong = edge.getTarget();
+				}
+			}
+			out.collect(new Tuple2<String, String>(vertex.getId(), topSong));
+		}
+	}
+
+	public static final class CreateSimilarUserEdges implements GroupReduceFunction<Edge<String, Integer>,
+		Edge<String, NullValue>> {
+
+		public void reduce(Iterable<Edge<String, Integer>> edges, Collector<Edge<String, NullValue>> out) {
+			List<String> listeners = new ArrayList<String>();
+			for (Edge<String, Integer> edge : edges) {
+				listeners.add(edge.getSource());
+			}
+			for (int i = 0; i < listeners.size() - 1; i++) {
+				for (int j = i + 1; j < listeners.size(); j++) {
+					out.collect(new Edge<String, NullValue>(listeners.get(i),
+							listeners.get(j), NullValue.getInstance()));
+				}
+			}
+		}
+	}
+
+	@Override
+	public String getDescription() {
+		return "Music Profiles Example";
+	}
+
+	// ******************************************************************************************************************
+	// UTIL METHODS
+	// ******************************************************************************************************************
+
+	private static boolean fileOutput = false;
+
+	private static String userSongTripletsInputPath = null;
+
+	private static String mismatchesInputPath = null;
+
+	private static String topTracksOutputPath = null;
+
+	private static int playcountThreshold = 0;
+
+	private static String communitiesOutputPath = null;
+
+	private static int maxIterations = 10;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			if(args.length != 6) {
+				System.err.println("Usage: MusicProfiles <input user song triplets path>" +
+						" <input song mismatches path> <output top tracks path> "
+						+ "<playcount threshold> <output communities path> <num iterations>");
+				return false;
+			}
+
+			fileOutput = true;
+			userSongTripletsInputPath = args[0];
+			mismatchesInputPath = args[1];
+			topTracksOutputPath = args[2];
+			playcountThreshold = Integer.parseInt(args[3]);
+			communitiesOutputPath = args[4];
+			maxIterations = Integer.parseInt(args[5]);
+		} else {
+			System.out.println("Executing Music Profiles example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("Usage: MusicProfiles <input user song triplets path>" +
+					" <input song mismatches path> <output top tracks path> "
+					+ "<playcount threshold> <output communities path> <num iterations>");
+		}
+		return true;
+	}
+
+	private static DataSet<Tuple3<String, String, Integer>> getUserSongTripletsData(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(userSongTripletsInputPath)
+					.lineDelimiter("\n").fieldDelimiter("\t")
+					.types(String.class, String.class, Integer.class);
+		} else {
+			return MusicProfilesData.getUserSongTriplets(env);
+		}
+	}
+
+	private static DataSet<String> getMismatchesData(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readTextFile(mismatchesInputPath);
+		} else {
+			return MusicProfilesData.getMismatches(env);
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
new file mode 100644
index 0000000..ef09bff
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This example shows how to use Gelly's vertex-centric iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm.
+ * For a gather-sum-apply implementation of the same algorithm, please refer to {@link GSASingleSourceShortestPaths}. 
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.SingleSourceShortestPathsData}
+ */
+public class SingleSourceShortestPaths implements ProgramDescription {
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
+
+		// Execute the vertex-centric iteration
+		Graph<Long, Double, Double> result = graph.runVertexCentricIteration(
+				new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations);
+
+		// Extract the vertices as the result
+		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
+
+		// emit result
+		if (fileOutput) {
+			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",");
+
+			// since file sinks are lazy, we trigger the execution explicitly
+			env.execute("Single Source Shortest Paths Example");
+		} else {
+			singleSourceShortestPaths.print();
+		}
+
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Single Source Shortest Path UDFs
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class InitVertices implements MapFunction<Long, Double>{
+
+		private long srcId;
+
+		public InitVertices(long srcId) {
+			this.srcId = srcId;
+		}
+
+		public Double map(Long id) {
+			if (id.equals(srcId)) {
+				return 0.0;
+			}
+			else {
+				return Double.POSITIVE_INFINITY;
+			}
+		}
+	}
+
+	/**
+	 * Function that updates the value of a vertex by picking the minimum
+	 * distance from all incoming messages.
+	 */
+	@SuppressWarnings("serial")
+	public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> {
+
+		@Override
+		public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) {
+
+			Double minDistance = Double.MAX_VALUE;
+
+			for (double msg : inMessages) {
+				if (msg < minDistance) {
+					minDistance = msg;
+				}
+			}
+
+			if (vertex.getValue() > minDistance) {
+				setNewVertexValue(minDistance);
+			}
+		}
+	}
+
+	/**
+	 * Distributes the minimum distance associated with a given vertex among all
+	 * the target vertices summed up with the edge's value.
+	 */
+	@SuppressWarnings("serial")
+	public static final class MinDistanceMessenger extends MessagingFunction<Long, Double, Double, Double> {
+
+		@Override
+		public void sendMessages(Vertex<Long, Double> vertex) {
+			for (Edge<Long, Double> edge : getEdges()) {
+				sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
+			}
+		}
+	}
+
+	// ******************************************************************************************************************
+	// UTIL METHODS
+	// ******************************************************************************************************************
+
+	private static boolean fileOutput = false;
+
+	private static Long srcVertexId = 1l;
+
+	private static String edgesInputPath = null;
+
+	private static String outputPath = null;
+
+	private static int maxIterations = 5;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			if(args.length != 4) {
+				System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+						" <input edges path> <output path> <num iterations>");
+				return false;
+			}
+
+			fileOutput = true;
+			srcVertexId = Long.parseLong(args[0]);
+			edgesInputPath = args[1];
+			outputPath = args[2];
+			maxIterations = Integer.parseInt(args[3]);
+		} else {
+				System.out.println("Executing Single Source Shortest Paths example "
+						+ "with default parameters and built-in default data.");
+				System.out.println("  Provide parameters to read input data from files.");
+				System.out.println("  See the documentation for the correct format of input files.");
+				System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+						" <input edges path> <output path> <num iterations>");
+		}
+		return true;
+	}
+
+	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(edgesInputPath)
+					.lineDelimiter("\n")
+					.fieldDelimiter("\t")
+					.types(Long.class, Long.class, Double.class)
+					.map(new Tuple3ToEdgeMap<Long, Double>());
+		} else {
+			return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
+		}
+	}
+
+	@Override
+	public String getDescription() {
+		return "Vertex-centric Single Source Shortest Paths";
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
new file mode 100644
index 0000000..c37b2b5
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data set used for the Simple Community Detection test program.
+ * If no parameters are given to the program, the default edge data set is used.
+ */
+public class CommunityDetectionData {
+
+	// the algorithm is not guaranteed to always converge
+	public static final Integer MAX_ITERATIONS = 30;
+
+	public static final double DELTA = 0.5f;
+
+	public static final String COMMUNITIES_SINGLE_ITERATION = "1,5\n" + "2,6\n"
+			+ "3,1\n" + "4,1\n" + "5,1\n" + "6,8\n" + "7,8\n" + "8,7"; 
+
+	public static final String COMMUNITIES_WITH_TIE = "1,2\n" + "2,1\n" + "3,1\n" + "4,1\n" + "5,1";
+
+	public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 3L, 2.0));
+		edges.add(new Edge<Long, Double>(1L, 4L, 3.0));
+		edges.add(new Edge<Long, Double>(2L, 3L, 4.0));
+		edges.add(new Edge<Long, Double>(2L, 4L, 5.0));
+		edges.add(new Edge<Long, Double>(3L, 5L, 6.0));
+		edges.add(new Edge<Long, Double>(5L, 6L, 7.0));
+		edges.add(new Edge<Long, Double>(5L, 7L, 8.0));
+		edges.add(new Edge<Long, Double>(6L, 7L, 9.0));
+		edges.add(new Edge<Long, Double>(7L, 12L, 10.0));
+		edges.add(new Edge<Long, Double>(8L, 9L, 11.0));
+		edges.add(new Edge<Long, Double>(8L, 10L, 12.0));
+		edges.add(new Edge<Long, Double>(8L, 11L, 13.0));
+		edges.add(new Edge<Long, Double>(9L, 10L, 14.0));
+		edges.add(new Edge<Long, Double>(9L, 11L, 15.0));
+		edges.add(new Edge<Long, Double>(10L, 11L, 16.0));
+		edges.add(new Edge<Long, Double>(10L, 12L, 17.0));
+		edges.add(new Edge<Long, Double>(11L, 12L, 18.0));
+
+		return env.fromCollection(edges);
+	}
+
+	public static DataSet<Edge<Long, Double>> getSimpleEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 3L, 2.0));
+		edges.add(new Edge<Long, Double>(1L, 4L, 3.0));
+		edges.add(new Edge<Long, Double>(1L, 5L, 4.0));
+		edges.add(new Edge<Long, Double>(2L, 6L, 5.0));
+		edges.add(new Edge<Long, Double>(6L, 7L, 6.0));
+		edges.add(new Edge<Long, Double>(6L, 8L, 7.0));
+		edges.add(new Edge<Long, Double>(7L, 8L, 8.0));
+
+		return env.fromCollection(edges);
+	}
+
+	private CommunityDetectionData() {}
+
+	public static DataSet<Edge<Long, Double>> getTieEdgeDataSet(ExecutionEnvironment env) {
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 3L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 4L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 5L, 1.0));
+
+		return env.fromCollection(edges);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
new file mode 100644
index 0000000..67864eb
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.types.NullValue;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the connected components example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class ConnectedComponentsDefaultData {
+
+	public static final Integer MAX_ITERATIONS = 4;
+
+	public static final String EDGES = "1	2\n" + "2	3\n" + "2	4\n" + "3	4";
+
+	public static final Object[][] DEFAULT_EDGES = new Object[][] {
+		new Object[]{1L, 2L},
+		new Object[]{2L, 3L},
+		new Object[]{2L, 4L},
+		new Object[]{3L, 4L}
+	};
+
+	public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+		List<Edge<Long, NullValue>> edgeList = new LinkedList<Edge<Long, NullValue>>();
+		for (Object[] edge : DEFAULT_EDGES) {
+			edgeList.add(new Edge<Long, NullValue>((Long) edge[0], (Long) edge[1], NullValue.getInstance()));
+		}
+		return env.fromCollection(edgeList);
+	}
+
+	public static final String VERTICES_WITH_MIN_ID = "1,1\n" + "2,1\n" + "3,1\n" + "4,1";
+
+	private ConnectedComponentsDefaultData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java
new file mode 100644
index 0000000..80765bf
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.EuclideanGraphWeighing;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the Euclidean Graph example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class EuclideanGraphData {
+
+	public static final int NUM_VERTICES = 9;
+
+	public static final String VERTICES = "1,1.0,1.0\n" + "2,2.0,2.0\n" + "3,3.0,3.0\n" + "4,4.0,4.0\n" + "5,5.0,5.0\n" +
+			"6,6.0,6.0\n" + "7,7.0,7.0\n" + "8,8.0,8.0\n" + "9,9.0,9.0";
+
+	public static DataSet<Vertex<Long, EuclideanGraphWeighing.Point>> getDefaultVertexDataSet(ExecutionEnvironment env) {
+
+		List<Vertex<Long, EuclideanGraphWeighing.Point>> vertices = new ArrayList<Vertex<Long, EuclideanGraphWeighing.Point>>();
+		for(int i=1; i<=NUM_VERTICES; i++) {
+			vertices.add(new Vertex<Long, EuclideanGraphWeighing.Point>(new Long(i),
+					new EuclideanGraphWeighing.Point(new Double(i), new Double(i))));
+		}
+
+		return env.fromCollection(vertices);
+	}
+
+	public static final String EDGES = "1,2\n" + "1,4\n" + "2,3\n" + "2,4\n" + "2,5\n" +
+			"3,5\n" + "4,5\n" + "4,6\n" + "5,7\n" + "5,9\n" + "6,7\n" + "6,8\n" +
+			"7,8\n" + "7,9\n" +  "8,9";
+
+	public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 2L, 0.0));
+		edges.add(new Edge<Long, Double>(1L, 4L, 0.0));
+		edges.add(new Edge<Long, Double>(2L, 3L, 0.0));
+		edges.add(new Edge<Long, Double>(2L, 4L, 0.0));
+		edges.add(new Edge<Long, Double>(2L, 5L, 0.0));
+		edges.add(new Edge<Long, Double>(3L, 5L, 0.0));
+		edges.add(new Edge<Long, Double>(4L, 5L, 0.0));
+		edges.add(new Edge<Long, Double>(4L, 6L, 0.0));
+		edges.add(new Edge<Long, Double>(5L, 7L, 0.0));
+		edges.add(new Edge<Long, Double>(5L, 9L, 0.0));
+		edges.add(new Edge<Long, Double>(6L, 7L, 0.0));
+		edges.add(new Edge<Long, Double>(6L, 8L, 0.0));
+		edges.add(new Edge<Long, Double>(6L, 8L, 0.0));
+		edges.add(new Edge<Long, Double>(7L, 8L, 0.0));
+		edges.add(new Edge<Long, Double>(7L, 9L, 0.0));
+		edges.add(new Edge<Long, Double>(8L, 9L, 0.0));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final String RESULTED_WEIGHTED_EDGES = "1,2,1.4142135623730951\n" + "1,4,4.242640687119285\n" +
+			"2,3,1.4142135623730951\n" + "2,4,2.8284271247461903\n" + "2,5,4.242640687119285\n" + "3,5,2.8284271247461903\n" +
+			"4,5,1.4142135623730951\n" + "4,6,2.8284271247461903\n" + "5,7,2.8284271247461903\n" + "5,9,5.656854249492381\n" +
+			"6,7,1.4142135623730951\n" + "6,8,2.8284271247461903\n" + "7,8,1.4142135623730951\n" + "7,9,2.8284271247461903\n" +
+			"8,9,1.4142135623730951";
+
+	private EuclideanGraphData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
new file mode 100644
index 0000000..7fbee46
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+public class ExampleUtils {
+
+	@SuppressWarnings({ "serial", "unchecked", "rawtypes" })
+	public static void printResult(DataSet set, String msg) {
+		set.output(new PrintingOutputFormatWithMessage(msg) {
+		});
+	}
+
+	public static class PrintingOutputFormatWithMessage<T> implements
+			OutputFormat<T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private transient PrintStream stream;
+
+		private transient String prefix;
+
+		private String message;
+
+		// --------------------------------------------------------------------------------------------
+
+		/**
+		 * Instantiates a printing output format that prints to standard out.
+		 */
+		public PrintingOutputFormatWithMessage() {
+		}
+
+		public PrintingOutputFormatWithMessage(String msg) {
+			this.message = msg;
+		}
+
+		@Override
+		public void open(int taskNumber, int numTasks) {
+			// get the target stream
+			this.stream = System.out;
+
+			// set the prefix to message
+			this.prefix = message + ": ";
+		}
+
+		@Override
+		public void writeRecord(T record) {
+			if (this.prefix != null) {
+				this.stream.println(this.prefix + record.toString());
+			} else {
+				this.stream.println(record.toString());
+			}
+		}
+
+		@Override
+		public void close() {
+			this.stream = null;
+			this.prefix = null;
+		}
+
+		@Override
+		public String toString() {
+			return "Print to System.out";
+		}
+
+		@Override
+		public void configure(Configuration parameters) {
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static DataSet<Vertex<Long, NullValue>> getVertexIds(
+			ExecutionEnvironment env, final long numVertices) {
+		return env.generateSequence(1, numVertices).map(
+				new MapFunction<Long, Vertex<Long, NullValue>>() {
+					public Vertex<Long, NullValue> map(Long l) {
+						return new Vertex<Long, NullValue>(l, NullValue
+								.getInstance());
+					}
+				});
+	}
+
+	@SuppressWarnings("serial")
+	public static DataSet<Edge<Long, NullValue>> getRandomEdges(
+			ExecutionEnvironment env, final long numVertices) {
+		return env.generateSequence(1, numVertices).flatMap(
+				new FlatMapFunction<Long, Edge<Long, NullValue>>() {
+					@Override
+					public void flatMap(Long key, Collector<Edge<Long, NullValue>> out) throws Exception {
+						int numOutEdges = (int) (Math.random() * (numVertices / 2));
+						for (int i = 0; i < numOutEdges; i++) {
+							long target = (long) (Math.random() * numVertices) + 1;
+							out.collect(new Edge<Long, NullValue>(key, target,
+									NullValue.getInstance()));
+						}
+					}
+				});
+	}
+
+	public static DataSet<Vertex<Long, Double>> getLongDoubleVertexData(
+			ExecutionEnvironment env) {
+		List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>();
+		vertices.add(new Vertex<Long, Double>(1L, 1.0));
+		vertices.add(new Vertex<Long, Double>(2L, 2.0));
+		vertices.add(new Vertex<Long, Double>(3L, 3.0));
+		vertices.add(new Vertex<Long, Double>(4L, 4.0));
+		vertices.add(new Vertex<Long, Double>(5L, 5.0));
+
+		return env.fromCollection(vertices);
+	}
+
+	public static DataSet<Edge<Long, Double>> getLongDoubleEdgeData(
+			ExecutionEnvironment env) {
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 2L, 12.0));
+		edges.add(new Edge<Long, Double>(1L, 3L, 13.0));
+		edges.add(new Edge<Long, Double>(2L, 3L, 23.0));
+		edges.add(new Edge<Long, Double>(3L, 4L, 34.0));
+		edges.add(new Edge<Long, Double>(3L, 5L, 35.0));
+		edges.add(new Edge<Long, Double>(4L, 5L, 45.0));
+		edges.add(new Edge<Long, Double>(5L, 1L, 51.0));
+
+		return env.fromCollection(edges);
+	}
+
+	/**
+	 * Private constructor to prevent instantiation.
+	 */
+	private ExampleUtils() {
+		throw new RuntimeException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java
new file mode 100644
index 0000000..7b69ec0
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the IncrementalSSSP example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class IncrementalSSSPData {
+
+	public static final int NUM_VERTICES = 5;
+
+	public static final String VERTICES = "1,6.0\n" + "2,2.0\n" + "3,3.0\n" + "4,1.0\n" + "5,0.0";
+
+	public static DataSet<Vertex<Long, Double>> getDefaultVertexDataSet(ExecutionEnvironment env) {
+
+		List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>();
+		vertices.add(new Vertex<Long, Double>(1L, 6.0));
+		vertices.add(new Vertex<Long, Double>(2L, 2.0));
+		vertices.add(new Vertex<Long, Double>(3L, 3.0));
+		vertices.add(new Vertex<Long, Double>(4L, 1.0));
+		vertices.add(new Vertex<Long, Double>(5L, 0.0));
+
+		return env.fromCollection(vertices);
+	}
+
+	public static final String EDGES = "1,3,3.0\n" + "2,4,3.0\n" + "2,5,2.0\n" + "3,2,1.0\n" + "3,5,5.0\n" +
+			"4,5,1.0";
+
+	public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 3L, 3.0));
+		edges.add(new Edge<Long, Double>(2L, 4L, 3.0));
+		edges.add(new Edge<Long, Double>(2L, 5L, 2.0));
+		edges.add(new Edge<Long, Double>(3L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(3L, 5L, 5.0));
+		edges.add(new Edge<Long, Double>(4L, 5L, 1.0));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final String EDGES_IN_SSSP = "1,3,3.0\n" + "2,5,2.0\n" + "3,2,1.0\n" + "4,5,1.0";
+
+	public static final DataSet<Edge<Long, Double>> getDefaultEdgesInSSSP(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 3L, 3.0));
+		edges.add(new Edge<Long, Double>(2L, 5L, 2.0));
+		edges.add(new Edge<Long, Double>(3L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(4L, 5L, 1.0));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final String SRC_EDGE_TO_BE_REMOVED = "2";
+
+	public static final String TRG_EDGE_TO_BE_REMOVED = "5";
+
+	public static final String VAL_EDGE_TO_BE_REMOVED = "2.0";
+
+	public static final Edge<Long, Double> getDefaultEdgeToBeRemoved() {
+
+		return new Edge<Long, Double>(2L, 5L, 2.0);
+	}
+
+	public static final String RESULTED_VERTICES = "1," + Double.MAX_VALUE + "\n" + "2," + Double.MAX_VALUE+ "\n"
+			+ "3," + Double.MAX_VALUE + "\n" + "4,1.0\n" + "5,0.0";
+
+	private IncrementalSSSPData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java
new file mode 100644
index 0000000..7564b95
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the Jaccard Similarity Measure example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class JaccardSimilarityMeasureData {
+
+	public static final String EDGES = "1	2\n" + "1	3\n" + "1	4\n" + "1	5\n" + "2	3\n" + "2	4\n" +
+			"2	5\n" + "3	4\n" + "3	5\n" + "4	5";
+
+	public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 2L, new Double(0)));
+		edges.add(new Edge<Long, Double>(1L, 3L, new Double(0)));
+		edges.add(new Edge<Long, Double>(1L, 4L, new Double(0)));
+		edges.add(new Edge<Long, Double>(1L, 5L, new Double(0)));
+		edges.add(new Edge<Long, Double>(2L, 3L, new Double(0)));
+		edges.add(new Edge<Long, Double>(2L, 4L, new Double(0)));
+		edges.add(new Edge<Long, Double>(2L, 5L, new Double(0)));
+		edges.add(new Edge<Long, Double>(3L, 4L, new Double(0)));
+		edges.add(new Edge<Long, Double>(3L, 5L, new Double(0)));
+		edges.add(new Edge<Long, Double>(4L, 5L, new Double(0)));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final String JACCARD_EDGES = "1,2,0.6\n" + "1,3,0.6\n" + "1,4,0.6\n" + "1,5,0.6\n" +
+			"2,3,0.6\n" + "2,4,0.6\n" + "2,5,0.6\n" + "3,4,0.6\n" + "3,5,0.6\n" + "4,5,0.6";
+
+	private JaccardSimilarityMeasureData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
new file mode 100644
index 0000000..0a92097
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+
+/**
+ * Provides the default data set used for the Label Propagation test program.
+ * If no parameters are given to the program, the default edge data set is used.
+ */
+public class LabelPropagationData {
+	
+	public static final String LABELS_AFTER_1_ITERATION = "1,10\n" +
+			"2,10\n" +
+			"3,10\n" +
+			"4,40\n" +
+			"5,40\n" +
+			"6,40\n" +
+			"7,40\n";
+
+	public static final String LABELS_WITH_TIE ="1,10\n" +
+			"2,10\n" +
+			"3,10\n" +
+			"4,10\n" +
+			"5,20\n" +
+			"6,20\n" +
+			"7,20\n" +
+			"8,20\n" +
+			"9,20\n";
+
+	private LabelPropagationData() {}
+
+	public static final DataSet<Vertex<Long, Long>> getDefaultVertexSet(ExecutionEnvironment env) {
+
+		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+		vertices.add(new Vertex<Long, Long>(1l, 10l));
+		vertices.add(new Vertex<Long, Long>(2l, 10l));
+		vertices.add(new Vertex<Long, Long>(3l, 30l));
+		vertices.add(new Vertex<Long, Long>(4l, 40l));
+		vertices.add(new Vertex<Long, Long>(5l, 40l));
+		vertices.add(new Vertex<Long, Long>(6l, 40l));
+		vertices.add(new Vertex<Long, Long>(7l, 40l));
+
+		return env.fromCollection(vertices);
+	}
+
+	public static final DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
+		edges.add(new Edge<Long, NullValue>(1L, 3L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(4L, 7L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(5L, 7L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(6L, 7L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(7L, 3L, NullValue.getInstance()));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final DataSet<Vertex<Long, Long>> getTieVertexSet(ExecutionEnvironment env) {
+
+		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+		vertices.add(new Vertex<Long, Long>(1l, 10l));
+		vertices.add(new Vertex<Long, Long>(2l, 10l));
+		vertices.add(new Vertex<Long, Long>(3l, 10l));
+		vertices.add(new Vertex<Long, Long>(4l, 10l));
+		vertices.add(new Vertex<Long, Long>(5l, 0l));
+		vertices.add(new Vertex<Long, Long>(6l, 20l));
+		vertices.add(new Vertex<Long, Long>(7l, 20l));
+		vertices.add(new Vertex<Long, Long>(8l, 20l));
+		vertices.add(new Vertex<Long, Long>(9l, 20l));
+
+		return env.fromCollection(vertices);
+	}
+
+	public static final DataSet<Edge<Long, NullValue>> getTieEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
+		edges.add(new Edge<Long, NullValue>(1L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(2L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(4L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(5L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(6L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(7L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(8L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(9L, 5L, NullValue.getInstance()));
+
+		return env.fromCollection(edges);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
new file mode 100644
index 0000000..3a97d1f
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+/**
+ * Provides the default data sets used for the Music Profiles example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class MusicProfilesData {
+
+	public static DataSet<Tuple3<String, String, Integer>> getUserSongTriplets(ExecutionEnvironment env) {
+		List<Tuple3<String, String, Integer>> triplets = new ArrayList<Tuple3<String, String, Integer>>();
+		
+		triplets.add(new Tuple3<String, String, Integer>("user_1", "song_1", 100));
+		triplets.add(new Tuple3<String, String, Integer>("user_1", "song_2", 10));
+		triplets.add(new Tuple3<String, String, Integer>("user_1", "song_3", 20));
+		triplets.add(new Tuple3<String, String, Integer>("user_1", "song_4", 30));
+		triplets.add(new Tuple3<String, String, Integer>("user_1", "song_5", 1));
+		
+		triplets.add(new Tuple3<String, String, Integer>("user_2", "song_6", 40));
+		triplets.add(new Tuple3<String, String, Integer>("user_2", "song_7", 10));
+		triplets.add(new Tuple3<String, String, Integer>("user_2", "song_8", 3));
+		
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_1", 100));
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_2", 10));
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_3", 20));
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_8", 30));
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_9", 1));
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_10", 8));
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_11", 90));
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_12", 30));
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_13", 34));
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_14", 17));
+		
+		triplets.add(new Tuple3<String, String, Integer>("user_4", "song_1", 100));
+		triplets.add(new Tuple3<String, String, Integer>("user_4", "song_6", 10));
+		triplets.add(new Tuple3<String, String, Integer>("user_4", "song_8", 20));
+		triplets.add(new Tuple3<String, String, Integer>("user_4", "song_12", 30));
+		triplets.add(new Tuple3<String, String, Integer>("user_4", "song_13", 1));
+		triplets.add(new Tuple3<String, String, Integer>("user_4", "song_15", 1));
+		
+		triplets.add(new Tuple3<String, String, Integer>("user_5", "song_3", 300));
+		triplets.add(new Tuple3<String, String, Integer>("user_5", "song_4", 4));
+		triplets.add(new Tuple3<String, String, Integer>("user_5", "song_5", 5));
+		triplets.add(new Tuple3<String, String, Integer>("user_5", "song_8", 8));
+		triplets.add(new Tuple3<String, String, Integer>("user_5", "song_9", 9));
+		triplets.add(new Tuple3<String, String, Integer>("user_5", "song_10", 10));
+		triplets.add(new Tuple3<String, String, Integer>("user_5", "song_12", 12));
+		triplets.add(new Tuple3<String, String, Integer>("user_5", "song_13", 13));
+		triplets.add(new Tuple3<String, String, Integer>("user_5", "song_15", 15));
+
+		triplets.add(new Tuple3<String, String, Integer>("user_6", "song_6", 30));
+
+		return env.fromCollection(triplets);
+	}
+	
+	public static DataSet<String> getMismatches(ExecutionEnvironment env) {
+		List<String> errors = new ArrayList<String>();
+		errors.add("ERROR: <song_8 track_8> Sever");
+		errors.add("ERROR: <song_15 track_15> Black Trees");
+		return env.fromCollection(errors);
+	}
+
+	public static final String USER_SONG_TRIPLETS = "user_1	song_1	100\n" + "user_1	song_5	200\n"
+			+ "user_2	song_1	10\n" + "user_2	song_4	20\n"
+			+ "user_3	song_2	3\n"
+			+ "user_4	song_2	1\n" + "user_4	song_3	2\n"
+			+ "user_5	song_3	30";
+
+	public static final String MISMATCHES = "ERROR: <song_5 track_8> Angie";
+
+	public static final String MAX_ITERATIONS = "2";
+
+	public static final String TOP_SONGS_RESULT = "user_1	song_1\n" +
+								"user_2	song_4\n" +
+								"user_3	song_2\n" +
+								"user_4	song_3\n" +
+								"user_5	song_3";
+
+	public static final String COMMUNITIES_RESULT = "user_1	1\n" +
+								"user_2	1\n" +
+								"user_3	3\n" +
+								"user_4	3\n" +
+								"user_5	4";
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
new file mode 100644
index 0000000..58d4f8b
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
+/**
+ * Provides the default data set used for the PageRank test program.
+ * If no parameters are given to the program, the default edge data set is used.
+ */
+public class PageRankData {
+	
+	public static final String EDGES = "2	1\n" +
+										"5	2\n" + 
+										"5	4\n" +
+										"4	3\n" +
+										"4	2\n" +
+										"1	4\n" +
+										"1	2\n" +
+										"1	3\n" +
+										"3	5\n";
+
+	
+	public static final String RANKS_AFTER_3_ITERATIONS = "1,0.237\n" +
+														"2,0.248\n" + 
+														"3,0.173\n" +
+														"4,0.175\n" +
+														"5,0.165\n";
+
+	private PageRankData() {}
+
+	public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(2L, 1L, 1.0));
+		edges.add(new Edge<Long, Double>(5L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(5L, 4L, 1.0));
+		edges.add(new Edge<Long, Double>(4L, 3L, 1.0));
+		edges.add(new Edge<Long, Double>(4L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 4L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 3L, 1.0));
+		edges.add(new Edge<Long, Double>(3L, 5L, 1.0));
+
+		return env.fromCollection(edges);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
new file mode 100644
index 0000000..6b985c5
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
+/**
+ * Provides the default data set used for the Single Source Shortest Paths example program.
+ * If no parameters are given to the program, the default edge data set is used.
+ */
+public class SingleSourceShortestPathsData {
+
+	public static final Long SRC_VERTEX_ID = 1L;
+
+	public static final String EDGES = "1\t2\t12.0\n" + "1\t3\t13.0\n" + "2\t3\t23.0\n" + "3\t4\t34.0\n" + "3\t5\t35.0\n" +
+					"4\t5\t45.0\n" + "5\t1\t51.0";
+
+	public static final Object[][] DEFAULT_EDGES = new Object[][] {
+		new Object[]{1L, 2L, 12.0},
+		new Object[]{1L, 3L, 13.0},
+		new Object[]{2L, 3L, 23.0},
+		new Object[]{3L, 4L, 34.0},
+		new Object[]{3L, 5L, 35.0},
+		new Object[]{4L, 5L, 45.0},
+		new Object[]{5L, 1L, 51.0}
+	};
+
+	public static final String RESULTED_SINGLE_SOURCE_SHORTEST_PATHS =  "1,0.0\n" + "2,12.0\n" + "3,13.0\n" + 
+								"4,47.0\n" + "5,48.0";
+
+	public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+		
+		List<Edge<Long, Double>> edgeList = new LinkedList<Edge<Long, Double>>();
+		for (Object[] edge : DEFAULT_EDGES) {
+			edgeList.add(new Edge<Long, Double>((Long) edge[0], (Long) edge[1], (Double) edge[2]));
+		}
+		return env.fromCollection(edgeList);
+	}
+
+	private SingleSourceShortestPathsData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
new file mode 100644
index 0000000..5b2cc3d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.types.NullValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the Triangle Count test program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class TriangleCountData {
+
+	public static final String EDGES = "1	2\n"+"1	3\n"+"2	3\n"+"2	6\n"+"3	4\n"+"3	5\n"+"3	6\n"+"4	5\n"+"6	7\n";
+
+	public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
+		edges.add(new Edge<Long, NullValue>(1L, 2L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(1L, 3L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(2L, 6L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(3L, 4L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(3L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(3L, 6L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(4L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(6L, 7L, NullValue.getInstance()));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final String RESULTED_NUMBER_OF_TRIANGLES = "3";
+
+	private TriangleCountData () {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
new file mode 100755
index 0000000..5a8e97a
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * The base class for the third and last step of a {@link GatherSumApplyIteration}.
+ *
+ * @param <K> the vertex ID type
+ * @param <VV> the vertex value type
+ * @param <M> the input type (produced by the Sum phase)
+ */
+@SuppressWarnings("serial")
+public abstract class ApplyFunction<K, VV, M> implements Serializable {
+
+	// --------------------------------------------------------------------------------------------
+	//  Attribute that allows access to the total number of vertices inside an iteration.
+	// --------------------------------------------------------------------------------------------
+
+	private long numberOfVertices = -1L;
+
+	/**
+	 * Retrieves the number of vertices in the graph.
+	 * @return the number of vertices if the {@link IterationConfigurationion#setOptNumVertices(boolean)}
+	 * option has been set; -1 otherwise.
+	 */
+	public long getNumberOfVertices() {
+		return numberOfVertices;
+	}
+
+	void setNumberOfVertices(long numberOfVertices) {
+		this.numberOfVertices = numberOfVertices;
+	}
+
+	//---------------------------------------------------------------------------------------------
+
+	/**
+	 * This method is invoked once per superstep, after the {@link SumFunction} 
+	 * in a {@link GatherSumApplyIteration}.
+	 * It updates the Vertex values.
+	 * 
+	 * @param newValue the value computed during the current superstep.
+	 * @param currentValue the current Vertex value.
+	 */
+	public abstract void apply(M newValue, VV currentValue);
+
+	/**
+	 * Sets the result for the apply function
+	 *
+	 * @param result the result of the apply phase
+	 */
+	public void setResult(VV result) {
+		outVal.f1 = result;
+		out.collect(outVal);
+	}
+
+	/**
+	 * This method is executed once per superstep before the vertex update function is invoked for each vertex.
+	 *
+	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+	 */
+	public void preSuperstep() {}
+
+	/**
+	 * This method is executed once per superstep after the vertex update function has been invoked for each vertex.
+	 *
+	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+	 */
+	public void postSuperstep() {}
+
+	/**
+	 * Gets the number of the superstep, starting at <tt>1</tt>.
+	 *
+	 * @return The number of the current superstep.
+	 */
+	public int getSuperstepNumber() {
+		return this.runtimeContext.getSuperstepNumber();
+	}
+
+	/**
+	 * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
+	 * all aggregates globally once per superstep and makes them available in the next superstep.
+	 *
+	 * @param name The name of the aggregator.
+	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
+	 */
+	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+		return this.runtimeContext.<T>getIterationAggregator(name);
+	}
+
+	/**
+	 * Get the aggregated value that an aggregator computed in the previous iteration.
+	 *
+	 * @param name The name of the aggregator.
+	 * @return The aggregated value of the previous iteration.
+	 */
+	public <T extends Value> T getPreviousIterationAggregate(String name) {
+		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+	}
+
+	/**
+	 * Gets the broadcast data set registered under the given name. Broadcast data sets
+	 * are available on all parallel instances of a function.
+	 *
+	 * @param name The name under which the broadcast set is registered.
+	 * @return The broadcast data set.
+	 */
+	public <T> Collection<T> getBroadcastSet(String name) {
+		return this.runtimeContext.<T>getBroadcastVariable(name);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Internal methods
+	// --------------------------------------------------------------------------------------------
+
+	private IterationRuntimeContext runtimeContext;
+
+	private Collector<Vertex<K, VV>> out;
+
+	private Vertex<K, VV> outVal;
+
+	public void init(IterationRuntimeContext iterationRuntimeContext) {
+		this.runtimeContext = iterationRuntimeContext;
+	}
+
+	public void setOutput(Vertex<K, VV> vertex, Collector<Vertex<K, VV>> out) {
+		this.out = out;
+		this.outVal = vertex;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
new file mode 100644
index 0000000..8d24f16
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.IterationConfiguration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A GSAConfiguration object can be used to set the iteration name and
+ * degree of parallelism, to register aggregators and use broadcast sets in
+ * the {@link org.apache.flink.graph.gsa.GatherFunction}, {@link org.apache.flink.graph.gsa.SumFunction} as well as
+ * {@link org.apache.flink.graph.gsa.ApplyFunction}.
+ *
+ * The GSAConfiguration object is passed as an argument to
+ * {@link org.apache.flink.graph.Graph#runGatherSumApplyIteration(org.apache.flink.graph.gsa.GatherFunction,
+ * org.apache.flink.graph.gsa.SumFunction, org.apache.flink.graph.gsa.ApplyFunction, int)}
+ */
+public class GSAConfiguration extends IterationConfiguration {
+
+	/** the broadcast variables for the gather function **/
+	private List<Tuple2<String, DataSet<?>>> bcVarsGather = new ArrayList<Tuple2<String,DataSet<?>>>();
+
+	/** the broadcast variables for the sum function **/
+	private List<Tuple2<String, DataSet<?>>> bcVarsSum = new ArrayList<Tuple2<String,DataSet<?>>>();
+
+	/** the broadcast variables for the apply function **/
+	private List<Tuple2<String, DataSet<?>>> bcVarsApply = new ArrayList<Tuple2<String,DataSet<?>>>();
+
+	private EdgeDirection direction = EdgeDirection.OUT;
+
+	public GSAConfiguration() {}
+
+	/**
+	 * Adds a data set as a broadcast set to the gather function.
+	 *
+	 * @param name The name under which the broadcast data is available in the gather function.
+	 * @param data The data set to be broadcasted.
+	 */
+	public void addBroadcastSetForGatherFunction(String name, DataSet<?> data) {
+		this.bcVarsGather.add(new Tuple2<String, DataSet<?>>(name, data));
+	}
+
+	/**
+	 * Adds a data set as a broadcast set to the sum function.
+	 *
+	 * @param name The name under which the broadcast data is available in the sum function.
+	 * @param data The data set to be broadcasted.
+	 */
+	public void addBroadcastSetForSumFunction(String name, DataSet<?> data) {
+		this.bcVarsSum.add(new Tuple2<String, DataSet<?>>(name, data));
+	}
+
+	/**
+	 * Adds a data set as a broadcast set to the apply function.
+	 *
+	 * @param name The name under which the broadcast data is available in the apply function.
+	 * @param data The data set to be broadcasted.
+	 */
+	public void addBroadcastSetForApplyFunction(String name, DataSet<?> data) {
+		this.bcVarsApply.add(new Tuple2<String, DataSet<?>>(name, data));
+	}
+
+	/**
+	 * Get the broadcast variables of the GatherFunction.
+	 *
+	 * @return a List of Tuple2, where the first field is the broadcast variable name
+	 * and the second field is the broadcast DataSet.
+	 */
+	public List<Tuple2<String, DataSet<?>>> getGatherBcastVars() {
+		return this.bcVarsGather;
+	}
+
+	/**
+	 * Get the broadcast variables of the SumFunction.
+	 *
+	 * @return a List of Tuple2, where the first field is the broadcast variable name
+	 * and the second field is the broadcast DataSet.
+	 */
+	public List<Tuple2<String, DataSet<?>>> getSumBcastVars() {
+		return this.bcVarsSum;
+	}
+
+	/**
+	 * Get the broadcast variables of the ApplyFunction.
+	 *
+	 * @return a List of Tuple2, where the first field is the broadcast variable name
+	 * and the second field is the broadcast DataSet.
+	 */
+	public List<Tuple2<String, DataSet<?>>> getApplyBcastVars() {
+		return this.bcVarsApply;
+	}
+
+	/**
+	 * Gets the direction from which the neighbors are to be selected
+	 * By default the neighbors who are target of the edges are selected
+	 *
+	 * @return an EdgeDirection, which can be either IN, OUT or ALL.
+	 */
+	public EdgeDirection getDirection() {
+		return direction;
+	}
+
+	/**
+	 * Sets the direction in which neighbors are to be selected
+	 * By default the neighbors who are target of the edges are selected
+	 *
+	 * @param direction - IN, OUT or ALL
+	 */
+	public void setDirection(EdgeDirection direction) {
+		this.direction = direction;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
new file mode 100755
index 0000000..563b20e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.types.Value;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * The base class for the first step of a {@link GatherSumApplyIteration}.
+ * 
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ * @param <M> the output type 
+ */
+@SuppressWarnings("serial")
+public abstract class GatherFunction<VV, EV, M> implements Serializable {
+
+	// --------------------------------------------------------------------------------------------
+	//  Attribute that allows access to the total number of vertices inside an iteration.
+	// --------------------------------------------------------------------------------------------
+
+	private long numberOfVertices = -1L;
+
+	/**
+	 * Retrieves the number of vertices in the graph.
+	 * @return the number of vertices if the {@link IterationConfigurationion#setOptNumVertices(boolean)}
+	 * option has been set; -1 otherwise.
+	 */
+	public long getNumberOfVertices() {
+		return numberOfVertices;
+	}
+
+	void setNumberOfVertices(long numberOfVertices) {
+		this.numberOfVertices = numberOfVertices;
+	}
+
+	//---------------------------------------------------------------------------------------------
+
+	/**
+	 * This method is invoked once per superstep, for each {@link Neighbor} of each Vertex 
+	 * in the beginning of each superstep in a {@link GatherSumApplyIteration}.
+	 * It needs to produce a partial value, which will be combined with other partial value
+	 * in the next phase of the iteration.
+	 *  
+	 * @param neighbor the input Neighbor. It provides access to the source Vertex and the Edge objects.
+	 * @return a partial result to be combined in the Sum phase.
+	 */
+	public abstract M gather(Neighbor<VV, EV> neighbor);
+
+	/**
+	 * This method is executed once per superstep before the vertex update function is invoked for each vertex.
+	 *
+	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+	 */
+	public void preSuperstep() {}
+
+	/**
+	 * This method is executed once per superstep after the vertex update function has been invoked for each vertex.
+	 *
+	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+	 */
+	public void postSuperstep() {}
+
+	/**
+	 * Gets the number of the superstep, starting at <tt>1</tt>.
+	 *
+	 * @return The number of the current superstep.
+	 */
+	public int getSuperstepNumber() {
+		return this.runtimeContext.getSuperstepNumber();
+	}
+
+	/**
+	 * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
+	 * all aggregates globally once per superstep and makes them available in the next superstep.
+	 *
+	 * @param name The name of the aggregator.
+	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
+	 */
+	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+		return this.runtimeContext.<T>getIterationAggregator(name);
+	}
+
+	/**
+	 * Get the aggregated value that an aggregator computed in the previous iteration.
+	 *
+	 * @param name The name of the aggregator.
+	 * @return The aggregated value of the previous iteration.
+	 */
+	public <T extends Value> T getPreviousIterationAggregate(String name) {
+		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+	}
+
+	/**
+	 * Gets the broadcast data set registered under the given name. Broadcast data sets
+	 * are available on all parallel instances of a function.
+	 *
+	 * @param name The name under which the broadcast set is registered.
+	 * @return The broadcast data set.
+	 */
+	public <T> Collection<T> getBroadcastSet(String name) {
+		return this.runtimeContext.<T>getBroadcastVariable(name);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Internal methods
+	// --------------------------------------------------------------------------------------------
+
+	private IterationRuntimeContext runtimeContext;
+
+	public void init(IterationRuntimeContext iterationRuntimeContext) {
+		this.runtimeContext = iterationRuntimeContext;
+	}
+}