You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/19 18:01:09 UTC

[03/10] flink git commit: [FLINK-2857] [gelly] Improve Gelly API and documentation. - Improve javadocs of Graph creation methods - Add fromTuple2 creation methods - Rename mapper parameters to vertexInitializer. - Improve javadocs and parameter names of

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
index b7e3385..2a10bd1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeJoinFunction;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Triplet;
 import org.apache.flink.graph.Vertex;
@@ -90,11 +91,10 @@ public class EuclideanGraphWeighing implements ProgramDescription {
 				});
 
 		Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(edgesWithEuclideanWeight,
-				new MapFunction<Tuple2<Double, Double>, Double>() {
+				new EdgeJoinFunction<Double, Double>() {
 
-					@Override
-					public Double map(Tuple2<Double, Double> distance) throws Exception {
-						return distance.f1;
+					public Double edgeJoin(Double edgeValue, Double inputValue) {
+						return inputValue;
 					}
 				});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
index 0f84dbb..5fb75e2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
@@ -29,6 +29,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.ReduceNeighborsFunction;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.VertexJoinFunction;
 import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
 
 import java.util.HashSet;
@@ -88,12 +89,11 @@ public class JaccardSimilarityMeasure implements ProgramDescription {
 
 		// join with the vertices to update the node values
 		Graph<Long, HashSet<Long>, Double> graphWithVertexValues =
-				graph.joinWithVertices(computedNeighbors, new MapFunction<Tuple2<HashSet<Long>, HashSet<Long>>,
+				graph.joinWithVertices(computedNeighbors, new VertexJoinFunction<HashSet<Long>,
 						HashSet<Long>>() {
 
-					@Override
-					public HashSet<Long> map(Tuple2<HashSet<Long>, HashSet<Long>> tuple2) throws Exception {
-						return tuple2.f1;
+					public HashSet<Long> vertexJoin(HashSet<Long> vertexValue, HashSet<Long> inputValue) {
+						return inputValue;
 					}
 				});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index e347bc5..297dce2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -37,6 +37,7 @@ import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.EdgesFunctionWithVertexValue;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.VertexJoinFunction;
 import org.apache.flink.graph.example.utils.MusicProfilesData;
 import org.apache.flink.graph.library.LabelPropagation;
 import org.apache.flink.types.NullValue;
@@ -149,9 +150,9 @@ public class MusicProfiles implements ProgramDescription {
 
 		DataSet<Vertex<String, Long>> verticesWithCommunity = similarUsersGraph
 				.joinWithVertices(idsWithInitialLabels,
-						new MapFunction<Tuple2<Long, Long>, Long>() {
-							public Long map(Tuple2<Long, Long> value) {
-								return value.f1;
+						new VertexJoinFunction<Long, Long>() {
+							public Long vertexJoin(Long vertexValue, Long inputValue) {
+								return inputValue;
 							}
 						}).run(new LabelPropagation<String, NullValue>(maxIterations));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
index 1694205..43a5e5c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.graph.library;
 
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeJoinFunction;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
@@ -85,7 +85,7 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet
 		DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
 
 		Graph<K, Double, Double> networkWithWeights = network
-				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
 
 		return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
 				new UpdateRanks<K>(beta, numberOfVertices), maxIterations)
@@ -144,9 +144,10 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet
 	}
 
 	@SuppressWarnings("serial")
-	private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
-		public Double map(Tuple2<Double, Long> value) {
-			return value.f0 / value.f1;
+	private static final class InitWeights implements EdgeJoinFunction<Double, Long> {
+
+		public Double edgeJoin(Double edgeValue, Long inputValue) {
+			return edgeValue / (double) inputValue;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
index 76d170d..1eafce2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
@@ -31,6 +31,7 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Triplet;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.VertexJoinFunction;
 import org.apache.flink.types.NullValue;
 
 import java.util.TreeMap;
@@ -143,12 +144,13 @@ public class GSATriangleCount<K extends Comparable<K>, VV, EV> implements
 	}
 
 	@SuppressWarnings("serial")
-	private static final class AttachValues<K> implements MapFunction<Tuple2<TreeMap<K, Integer>,
-			TreeMap<K, Integer>>, TreeMap<K, Integer>> {
+	private static final class AttachValues<K> implements VertexJoinFunction<TreeMap<K, Integer>,
+			TreeMap<K, Integer>> {
 
 		@Override
-		public TreeMap<K, Integer> map(Tuple2<TreeMap<K, Integer>, TreeMap<K, Integer>> tuple2) throws Exception {
-			return tuple2.f1;
+		public TreeMap<K, Integer> vertexJoin(TreeMap<K, Integer> vertexValue,
+				TreeMap<K, Integer> inputValue) {
+			return inputValue;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index c0e22c4..dfbe14b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.graph.library;
 
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeJoinFunction;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
@@ -86,7 +86,7 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
 		DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
 
 		Graph<K, Double, Double> networkWithWeights = network
-				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
 
 		return networkWithWeights.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
 				new RankMessenger<K>(numberOfVertices), maxIterations)
@@ -149,9 +149,10 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
 	}
 
 	@SuppressWarnings("serial")
-	private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
-		public Double map(Tuple2<Double, Long> value) {
-			return value.f0 / value.f1;
+	private static final class InitWeights implements EdgeJoinFunction<Double, Long> {
+
+		public Double edgeJoin(Double edgeValue, Long inputValue) {
+			return edgeValue / (double) inputValue;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
index 22a5151..20f4454 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
@@ -151,6 +152,59 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
 		compareResultAsText(result, expectedResult);
 	}
 
+	@Test
+	public void testFromTuple2() throws Exception {
+		/*
+		 * Test graph creation with fromTuple2DataSet
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple2<Long, Long>> edges = TestGraphUtils.getLongLongTuple2Data(env);
+
+		Graph<Long, NullValue, NullValue> graph = Graph.fromTuple2DataSet(edges, env);
+
+        List<Vertex<Long, NullValue>> result = graph.getVertices().collect();
+        
+		expectedResult = "1,(null)\n" +
+					"2,(null)\n" +
+					"3,(null)\n" +
+					"4,(null)\n" +
+					"6,(null)\n" +
+					"10,(null)\n" +
+					"20,(null)\n" +
+					"30,(null)\n" +
+					"40,(null)\n" +
+					"60,(null)\n";
+
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testFromTuple2WithMapper() throws Exception {
+		/*
+		 * Test graph creation with fromTuple2DataSet with vertex initializer
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple2<Long, Long>> edges = TestGraphUtils.getLongLongTuple2Data(env);
+
+		Graph<Long, String, NullValue> graph = Graph.fromTuple2DataSet(edges,
+				new BooMapper(), env);
+
+        List<Vertex<Long, String>> result = graph.getVertices().collect();
+        
+		expectedResult = "1,boo\n" +
+					"2,boo\n" +
+					"3,boo\n" +
+					"4,boo\n" +
+					"6,boo\n" +
+					"10,boo\n" +
+					"20,boo\n" +
+					"30,boo\n" +
+					"40,boo\n" +
+					"60,boo\n";
+
+		compareResultAsTuples(result, expectedResult);
+	}
+
 	@SuppressWarnings("serial")
 	private static final class AssignIdAsValueMapper implements MapFunction<Long, Long> {
 		public Long map(Long vertexId) {
@@ -171,4 +225,9 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
 			return dummyValue;
 		}
 	}
+
+	@SuppressWarnings("serial")
+	private static final class BooMapper implements MapFunction<Long, String> {
+		public String map(Long value) {	return "boo"; }
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
index e406ce2..e0bc35a 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeJoinFunction;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
@@ -462,9 +463,10 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
     }
 
 	@SuppressWarnings("serial")
-	private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> {
-		public Long map(Tuple2<Long, Long> tuple) throws Exception {
-			return tuple.f0 + tuple.f1;
+	private static final class AddValuesMapper implements EdgeJoinFunction<Long, Long> {
+
+		public Long edgeJoin(Long edgeValue, Long inputValue) throws Exception {
+			return edgeValue + inputValue;
 		}
 	}
 
@@ -477,29 +479,34 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
     }
 
 	@SuppressWarnings("serial")
-	private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> {
-        public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
-            if(tuple.f1) {
-                return tuple.f0 * 2;
+	private static final class DoubleIfTrueMapper implements EdgeJoinFunction<Long, Boolean> {
+
+		public Long edgeJoin(Long edgeValue, Boolean inputValue) {
+            if(inputValue) {
+                return edgeValue * 2;
             }
             else {
-                return tuple.f0;
+                return edgeValue;
             }
-        }
+		}
     }
 
 	@SuppressWarnings("serial")
-	private static final class DoubleValueMapper implements MapFunction<Tuple2<Long, Long>, Long> {
-        public Long map(Tuple2<Long, Long> tuple) throws Exception {
-            return tuple.f1 * 2;
-        }
+	private static final class DoubleValueMapper implements EdgeJoinFunction<Long, Long> {
+ 
+		public Long edgeJoin(Long edgeValue, Long inputValue) {
+			return inputValue*2;
+		}
     }
 
 	@SuppressWarnings("serial")
-	private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
-        public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception {
-            return (long) tuple.f1.getIntField();
-        }
+	private static final class CustomValueMapper implements EdgeJoinFunction<
+	Long, DummyCustomParameterizedType<Float>> {
+
+		public Long edgeJoin(Long edgeValue,
+				DummyCustomParameterizedType<Float> inputValue) {
+			return (long) inputValue.getIntField();
+		}
     }
 
 	@SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
index 22a5535..7a25788 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.VertexJoinFunction;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
 import org.apache.flink.graph.utils.VertexToTuple2Map;
@@ -173,9 +174,10 @@ public class JoinWithVerticesITCase extends MultipleProgramsTestBase {
     }
 
 	@SuppressWarnings("serial")
-	private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> {
-		public Long map(Tuple2<Long, Long> tuple) throws Exception {
-			return tuple.f0 + tuple.f1;
+	private static final class AddValuesMapper implements VertexJoinFunction<Long, Long> {
+
+		public Long vertexJoin(Long vertexValue, Long inputValue) {
+			return vertexValue + inputValue;
 		}
 	}
 
@@ -187,28 +189,32 @@ public class JoinWithVerticesITCase extends MultipleProgramsTestBase {
     }
 
 	@SuppressWarnings("serial")
-	private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> {
-        public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
-            if(tuple.f1) {
-                return tuple.f0 * 2;
+	private static final class DoubleIfTrueMapper implements VertexJoinFunction<Long, Boolean> {
+
+		public Long vertexJoin(Long vertexValue, Boolean inputValue) {
+            if(inputValue) {
+                return vertexValue * 2;
             }
             else {
-                return tuple.f0;
+                return vertexValue;
             }
-        }
+		}
     }
 
 	@SuppressWarnings("serial")
-	private static final class ProjectSecondMapper implements MapFunction<Tuple2<Long, Long>, Long> {
-        public Long map(Tuple2<Long, Long> tuple) throws Exception {
-            return tuple.f1;
-        }
+	private static final class ProjectSecondMapper implements VertexJoinFunction<Long, Long> {
+
+		public Long vertexJoin(Long vertexValue, Long inputValue) {
+			return inputValue;
+		}
     }
 
 	@SuppressWarnings("serial")
-	private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
-        public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception {
-            return (long) tuple.f1.getIntField();
-        }
+	private static final class CustomValueMapper implements VertexJoinFunction<Long,
+		DummyCustomParameterizedType<Float>> {
+
+		public Long vertexJoin(Long vertexValue, DummyCustomParameterizedType<Float> inputValue) {
+			return (long) inputValue.getIntField();
+		}
     }
 }