You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by an...@apache.org on 2015/06/20 16:32:37 UTC

flink git commit: [FLINK-2149][gelly] Simplified Jaccard Example

Repository: flink
Updated Branches:
  refs/heads/master 9ee4fa502 -> b2be80ddc


[FLINK-2149][gelly] Simplified Jaccard Example

This PR simplifies Gelly's Jaccard example by using the more efficient reduceOnNeighbors rather than groupReduceOnNeighbors.

Author: andralungu <lu...@gmail.com>

Closes #770 from andralungu/jaccardImprovement and squashes the following commits:

6e77f8d [andralungu] [FLINK-2149][gelly] Simplified Jaccard Example


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b2be80dd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b2be80dd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b2be80dd

Branch: refs/heads/master
Commit: b2be80ddc162e8ed3418e2f7666a47f749bd4f2f
Parents: 9ee4fa5
Author: andralungu <lu...@gmail.com>
Authored: Sat Jun 20 16:28:59 2015 +0200
Committer: andra <an...@apache.org>
Committed: Sat Jun 20 16:28:59 2015 +0200

----------------------------------------------------------------------
 .../graph/example/JaccardSimilarityMeasure.java | 104 +++++++++----------
 1 file changed, 49 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b2be80dd/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
index ac06bda..be241ce 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
@@ -23,16 +23,13 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.ReduceNeighborsFunction;
 import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.EdgesFunction;
 import org.apache.flink.graph.Triplet;
 import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
 
 import java.util.HashSet;
 
@@ -66,34 +63,45 @@ public class JaccardSimilarityMeasure implements ProgramDescription {
 
 		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
 
-		Graph<Long, NullValue, Double> graph = Graph.fromDataSet(edges, env);
+		Graph<Long, HashSet<Long>, Double> graph = Graph.fromDataSet(edges,
+				new MapFunction<Long, HashSet<Long>>() {
 
-		DataSet<Vertex<Long, HashSet<Long>>> verticesWithNeighbors =
-				graph.groupReduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL);
+					@Override
+					public HashSet<Long> map(Long id) throws Exception {
+						HashSet<Long> neighbors = new HashSet<Long>();
+						neighbors.add(id);
 
-		Graph<Long, HashSet<Long>, Double> graphWithVertexValues = Graph.fromDataSet(verticesWithNeighbors, edges, env);
+						return new HashSet<Long>(neighbors);
+					}
+				}, env);
 
-		// the edge value will be the Jaccard similarity coefficient(number of common neighbors/ all neighbors)
-		DataSet<Tuple3<Long, Long, Double>> edgesWithJaccardWeight = graphWithVertexValues.getTriplets()
-				.map(new WeighEdgesMapper());
+		// create the set of neighbors
+		DataSet<Tuple2<Long, HashSet<Long>>> computedNeighbors =
+				graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL);
 
-		DataSet<Edge<Long, Double>> result = graphWithVertexValues.joinWithEdges(edgesWithJaccardWeight,
-				new MapFunction<Tuple2<Double, Double>, Double>() {
+		// join with the vertices to update the node values
+		Graph<Long, HashSet<Long>, Double> graphWithVertexValues =
+				graph.joinWithVertices(computedNeighbors, new MapFunction<Tuple2<HashSet<Long>, HashSet<Long>>,
+						HashSet<Long>>() {
 
 					@Override
-					public Double map(Tuple2<Double, Double> value) throws Exception {
-						return value.f1;
+					public HashSet<Long> map(Tuple2<HashSet<Long>, HashSet<Long>> tuple2) throws Exception {
+						return tuple2.f1;
 					}
-				}).getEdges();
+				});
+
+		// compare neighbors, compute Jaccard
+		DataSet<Edge<Long, Double>> edgesWithJaccardValues =
+				graphWithVertexValues.getTriplets().map(new ComputeJaccard());
 
 		// emit result
 		if (fileOutput) {
-			result.writeAsCsv(outputPath, "\n", ",");
+			edgesWithJaccardValues.writeAsCsv(outputPath, "\n", ",");
 
 			// since file sinks are lazy, we trigger the execution explicitly
 			env.execute("Executing Jaccard Similarity Measure");
 		} else {
-			result.print();
+			edgesWithJaccardValues.print();
 		}
 
 	}
@@ -106,20 +114,14 @@ public class JaccardSimilarityMeasure implements ProgramDescription {
 	/**
 	 * Each vertex will have a HashSet containing its neighbor ids as value.
 	 */
-	private static final class GatherNeighbors implements EdgesFunction<Long, Double, Vertex<Long, HashSet<Long>>> {
+	@SuppressWarnings("serial")
+	private static final class GatherNeighbors implements ReduceNeighborsFunction<HashSet<Long>> {
 
 		@Override
-		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges,
-														Collector<Vertex<Long, HashSet<Long>>> out) throws Exception {
-
-			HashSet<Long> neighborsHashSet = new HashSet<Long>();
-			long vertexId = -1;
-
-			for(Tuple2<Long, Edge<Long, Double>> edge : edges) {
-				neighborsHashSet.add(getNeighborID(edge));
-				vertexId = edge.f0;
-			}
-			out.collect(new Vertex<Long, HashSet<Long>>(vertexId, neighborsHashSet));
+		public HashSet<Long> reduceNeighbors(HashSet<Long> first,
+											HashSet<Long> second) {
+			first.addAll(second);
+			return new HashSet<Long>(first);
 		}
 	}
 
@@ -134,37 +136,29 @@ public class JaccardSimilarityMeasure implements ProgramDescription {
 	 *
 	 * The Jaccard similarity coefficient is then, the intersection/union.
 	 */
-	private static class WeighEdgesMapper implements MapFunction<Triplet<Long, HashSet<Long>, Double>,
-			Tuple3<Long, Long, Double>> {
+	@SuppressWarnings("serial")
+	private static final class ComputeJaccard implements
+			MapFunction<Triplet<Long, HashSet<Long>, Double>, Edge<Long, Double>> {
 
 		@Override
-		public Tuple3<Long, Long, Double> map(Triplet<Long, HashSet<Long>, Double> triplet)
-				throws Exception {
+		public Edge<Long, Double> map(Triplet<Long, HashSet<Long>, Double> triplet) throws Exception {
 
-			Vertex<Long, HashSet<Long>> source = triplet.getSrcVertex();
-			Vertex<Long, HashSet<Long>> target = triplet.getTrgVertex();
+			Vertex<Long, HashSet<Long>> srcVertex = triplet.getSrcVertex();
+			Vertex<Long, HashSet<Long>> trgVertex = triplet.getTrgVertex();
 
-			long unionPlusIntersection = source.getValue().size() + target.getValue().size();
-			// within a HashSet, all elements are distinct
-			source.getValue().addAll(target.getValue());
-			// the source value contains the union
-			long union = source.getValue().size();
-			long intersection = unionPlusIntersection - union;
+			Long x = srcVertex.getId();
+			Long y = trgVertex.getId();
+			HashSet<Long> neighborSetY = trgVertex.getValue();
 
-			return new Tuple3<Long, Long, Double>(source.getId(), target.getId(), (double) intersection/union);
-		}
-	}
+			double unionPlusIntersection = srcVertex.getValue().size() + neighborSetY.size();
+			// within a HashSet, all elements are distinct
+			HashSet<Long> unionSet = new HashSet<Long>();
+			unionSet.addAll(srcVertex.getValue());
+			unionSet.addAll(neighborSetY);
+			double union = unionSet.size();
+			double intersection = unionPlusIntersection - union;
 
-	/**
-	 * Helper method that extracts the neighborId given an edge.
-	 * @param edge
-	 * @return
-	 */
-	private static Long getNeighborID(Tuple2<Long, Edge<Long, Double>> edge) {
-		if(edge.f1.getSource() == edge.f0) {
-			return edge.f1.getTarget();
-		} else {
-			return edge.f1.getSource();
+			return new Edge<Long, Double>(x, y, intersection/union);
 		}
 	}