You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/19 18:01:09 UTC
[03/10] flink git commit: [FLINK-2857] [gelly] Improve Gelly API and
documentation. - Improve javadocs of Graph creation methods - Add fromTuple2
creation methods - Rename mapper parameters to vertexInitializer. - Improve
javadocs and parameter names of
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
index b7e3385..2a10bd1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeJoinFunction;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Triplet;
import org.apache.flink.graph.Vertex;
@@ -90,11 +91,10 @@ public class EuclideanGraphWeighing implements ProgramDescription {
});
Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(edgesWithEuclideanWeight,
- new MapFunction<Tuple2<Double, Double>, Double>() {
+ new EdgeJoinFunction<Double, Double>() {
- @Override
- public Double map(Tuple2<Double, Double> distance) throws Exception {
- return distance.f1;
+ public Double edgeJoin(Double edgeValue, Double inputValue) {
+ return inputValue;
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
index 0f84dbb..5fb75e2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
@@ -29,6 +29,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.ReduceNeighborsFunction;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.VertexJoinFunction;
import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
import java.util.HashSet;
@@ -88,12 +89,11 @@ public class JaccardSimilarityMeasure implements ProgramDescription {
// join with the vertices to update the node values
Graph<Long, HashSet<Long>, Double> graphWithVertexValues =
- graph.joinWithVertices(computedNeighbors, new MapFunction<Tuple2<HashSet<Long>, HashSet<Long>>,
+ graph.joinWithVertices(computedNeighbors, new VertexJoinFunction<HashSet<Long>,
HashSet<Long>>() {
- @Override
- public HashSet<Long> map(Tuple2<HashSet<Long>, HashSet<Long>> tuple2) throws Exception {
- return tuple2.f1;
+ public HashSet<Long> vertexJoin(HashSet<Long> vertexValue, HashSet<Long> inputValue) {
+ return inputValue;
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index e347bc5..297dce2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -37,6 +37,7 @@ import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.EdgesFunctionWithVertexValue;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.VertexJoinFunction;
import org.apache.flink.graph.example.utils.MusicProfilesData;
import org.apache.flink.graph.library.LabelPropagation;
import org.apache.flink.types.NullValue;
@@ -149,9 +150,9 @@ public class MusicProfiles implements ProgramDescription {
DataSet<Vertex<String, Long>> verticesWithCommunity = similarUsersGraph
.joinWithVertices(idsWithInitialLabels,
- new MapFunction<Tuple2<Long, Long>, Long>() {
- public Long map(Tuple2<Long, Long> value) {
- return value.f1;
+ new VertexJoinFunction<Long, Long>() {
+ public Long vertexJoin(Long vertexValue, Long inputValue) {
+ return inputValue;
}
}).run(new LabelPropagation<String, NullValue>(maxIterations));
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
index 1694205..43a5e5c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
@@ -18,9 +18,9 @@
package org.apache.flink.graph.library;
-import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeJoinFunction;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
@@ -85,7 +85,7 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet
DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
Graph<K, Double, Double> networkWithWeights = network
- .joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+ .joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
new UpdateRanks<K>(beta, numberOfVertices), maxIterations)
@@ -144,9 +144,10 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet
}
@SuppressWarnings("serial")
- private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
- public Double map(Tuple2<Double, Long> value) {
- return value.f0 / value.f1;
+ private static final class InitWeights implements EdgeJoinFunction<Double, Long> {
+
+ public Double edgeJoin(Double edgeValue, Long inputValue) {
+ return edgeValue / (double) inputValue;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
index 76d170d..1eafce2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
@@ -31,6 +31,7 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Triplet;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.VertexJoinFunction;
import org.apache.flink.types.NullValue;
import java.util.TreeMap;
@@ -143,12 +144,13 @@ public class GSATriangleCount<K extends Comparable<K>, VV, EV> implements
}
@SuppressWarnings("serial")
- private static final class AttachValues<K> implements MapFunction<Tuple2<TreeMap<K, Integer>,
- TreeMap<K, Integer>>, TreeMap<K, Integer>> {
+ private static final class AttachValues<K> implements VertexJoinFunction<TreeMap<K, Integer>,
+ TreeMap<K, Integer>> {
@Override
- public TreeMap<K, Integer> map(Tuple2<TreeMap<K, Integer>, TreeMap<K, Integer>> tuple2) throws Exception {
- return tuple2.f1;
+ public TreeMap<K, Integer> vertexJoin(TreeMap<K, Integer> vertexValue,
+ TreeMap<K, Integer> inputValue) {
+ return inputValue;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index c0e22c4..dfbe14b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -18,10 +18,10 @@
package org.apache.flink.graph.library;
-import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeJoinFunction;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
@@ -86,7 +86,7 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
Graph<K, Double, Double> networkWithWeights = network
- .joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+ .joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
return networkWithWeights.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
new RankMessenger<K>(numberOfVertices), maxIterations)
@@ -149,9 +149,10 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
}
@SuppressWarnings("serial")
- private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
- public Double map(Tuple2<Double, Long> value) {
- return value.f0 / value.f1;
+ private static final class InitWeights implements EdgeJoinFunction<Double, Long> {
+
+ public Double edgeJoin(Double edgeValue, Long inputValue) {
+ return edgeValue / (double) inputValue;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
index 22a5151..20f4454 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
@@ -151,6 +152,59 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
compareResultAsText(result, expectedResult);
}
+ @Test
+ public void testFromTuple2() throws Exception {
+ /*
+ * Test graph creation with fromTuple2DataSet
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple2<Long, Long>> edges = TestGraphUtils.getLongLongTuple2Data(env);
+
+ Graph<Long, NullValue, NullValue> graph = Graph.fromTuple2DataSet(edges, env);
+
+ List<Vertex<Long, NullValue>> result = graph.getVertices().collect();
+
+ expectedResult = "1,(null)\n" +
+ "2,(null)\n" +
+ "3,(null)\n" +
+ "4,(null)\n" +
+ "6,(null)\n" +
+ "10,(null)\n" +
+ "20,(null)\n" +
+ "30,(null)\n" +
+ "40,(null)\n" +
+ "60,(null)\n";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testFromTuple2WithMapper() throws Exception {
+ /*
+ * Test graph creation with fromTuple2DataSet with vertex initializer
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple2<Long, Long>> edges = TestGraphUtils.getLongLongTuple2Data(env);
+
+ Graph<Long, String, NullValue> graph = Graph.fromTuple2DataSet(edges,
+ new BooMapper(), env);
+
+ List<Vertex<Long, String>> result = graph.getVertices().collect();
+
+ expectedResult = "1,boo\n" +
+ "2,boo\n" +
+ "3,boo\n" +
+ "4,boo\n" +
+ "6,boo\n" +
+ "10,boo\n" +
+ "20,boo\n" +
+ "30,boo\n" +
+ "40,boo\n" +
+ "60,boo\n";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
@SuppressWarnings("serial")
private static final class AssignIdAsValueMapper implements MapFunction<Long, Long> {
public Long map(Long vertexId) {
@@ -171,4 +225,9 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
return dummyValue;
}
}
+
+ @SuppressWarnings("serial")
+ private static final class BooMapper implements MapFunction<Long, String> {
+ public String map(Long value) { return "boo"; }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
index e406ce2..e0bc35a 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeJoinFunction;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
@@ -462,9 +463,10 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> {
- public Long map(Tuple2<Long, Long> tuple) throws Exception {
- return tuple.f0 + tuple.f1;
+ private static final class AddValuesMapper implements EdgeJoinFunction<Long, Long> {
+
+ public Long edgeJoin(Long edgeValue, Long inputValue) throws Exception {
+ return edgeValue + inputValue;
}
}
@@ -477,29 +479,34 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> {
- public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
- if(tuple.f1) {
- return tuple.f0 * 2;
+ private static final class DoubleIfTrueMapper implements EdgeJoinFunction<Long, Boolean> {
+
+ public Long edgeJoin(Long edgeValue, Boolean inputValue) {
+ if(inputValue) {
+ return edgeValue * 2;
}
else {
- return tuple.f0;
+ return edgeValue;
}
- }
+ }
}
@SuppressWarnings("serial")
- private static final class DoubleValueMapper implements MapFunction<Tuple2<Long, Long>, Long> {
- public Long map(Tuple2<Long, Long> tuple) throws Exception {
- return tuple.f1 * 2;
- }
+ private static final class DoubleValueMapper implements EdgeJoinFunction<Long, Long> {
+
+ public Long edgeJoin(Long edgeValue, Long inputValue) {
+ return inputValue*2;
+ }
}
@SuppressWarnings("serial")
- private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
- public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception {
- return (long) tuple.f1.getIntField();
- }
+ private static final class CustomValueMapper implements EdgeJoinFunction<
+ Long, DummyCustomParameterizedType<Float>> {
+
+ public Long edgeJoin(Long edgeValue,
+ DummyCustomParameterizedType<Float> inputValue) {
+ return (long) inputValue.getIntField();
+ }
}
@SuppressWarnings("serial")
http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
index 22a5535..7a25788 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.VertexJoinFunction;
import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
import org.apache.flink.graph.utils.VertexToTuple2Map;
@@ -173,9 +174,10 @@ public class JoinWithVerticesITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> {
- public Long map(Tuple2<Long, Long> tuple) throws Exception {
- return tuple.f0 + tuple.f1;
+ private static final class AddValuesMapper implements VertexJoinFunction<Long, Long> {
+
+ public Long vertexJoin(Long vertexValue, Long inputValue) {
+ return vertexValue + inputValue;
}
}
@@ -187,28 +189,32 @@ public class JoinWithVerticesITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> {
- public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
- if(tuple.f1) {
- return tuple.f0 * 2;
+ private static final class DoubleIfTrueMapper implements VertexJoinFunction<Long, Boolean> {
+
+ public Long vertexJoin(Long vertexValue, Boolean inputValue) {
+ if(inputValue) {
+ return vertexValue * 2;
}
else {
- return tuple.f0;
+ return vertexValue;
}
- }
+ }
}
@SuppressWarnings("serial")
- private static final class ProjectSecondMapper implements MapFunction<Tuple2<Long, Long>, Long> {
- public Long map(Tuple2<Long, Long> tuple) throws Exception {
- return tuple.f1;
- }
+ private static final class ProjectSecondMapper implements VertexJoinFunction<Long, Long> {
+
+ public Long vertexJoin(Long vertexValue, Long inputValue) {
+ return inputValue;
+ }
}
@SuppressWarnings("serial")
- private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
- public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception {
- return (long) tuple.f1.getIntField();
- }
+ private static final class CustomValueMapper implements VertexJoinFunction<Long,
+ DummyCustomParameterizedType<Float>> {
+
+ public Long vertexJoin(Long vertexValue, DummyCustomParameterizedType<Float> inputValue) {
+ return (long) inputValue.getIntField();
+ }
}
}