You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/01 21:55:04 UTC
flink git commit: [FLINK-2663] [gelly] Updated Gelly library methods
to use generic key types
Repository: flink
Updated Branches:
refs/heads/master bbd97354b -> 9f7110748
[FLINK-2663] [gelly] Updated Gelly library methods to use generic key types
This squashes the following commits:
[gelly] Added missing Javadocs to GSA classes
[FLINK-2663] [gelly] Updated Gelly library methods to also use generic vertex/edge values where possible
This closes #1152
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f711074
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f711074
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f711074
Branch: refs/heads/master
Commit: 9f7110748434d2bdea96f16c70f4f0894261ae69
Parents: bbd9735
Author: vasia <va...@apache.org>
Authored: Sun Sep 20 21:20:11 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Thu Oct 1 21:22:35 2015 +0200
----------------------------------------------------------------------
.../graph/example/ConnectedComponents.java | 2 +-
.../flink/graph/example/MusicProfiles.java | 3 +-
.../apache/flink/graph/gsa/ApplyFunction.java | 15 +++
.../apache/flink/graph/gsa/GatherFunction.java | 16 +++
.../org/apache/flink/graph/gsa/SumFunction.java | 18 +++-
.../flink/graph/library/CommunityDetection.java | 53 ++++++----
.../graph/library/ConnectedComponents.java | 32 +++---
.../graph/library/GSAConnectedComponents.java | 22 ++--
.../apache/flink/graph/library/GSAPageRank.java | 8 +-
.../library/GSASingleSourceShortestPaths.java | 8 +-
.../flink/graph/library/GSATriangleCount.java | 102 +++++++++----------
.../flink/graph/library/LabelPropagation.java | 12 ++-
.../apache/flink/graph/library/PageRank.java | 8 +-
.../library/SingleSourceShortestPaths.java | 8 +-
.../flink/graph/utils/NullValueEdgeMapper.java | 32 ++++++
.../apache/flink/graph/gsa/GSACompilerTest.java | 1 -
.../flink/graph/gsa/GSATranslationTest.java | 1 -
.../flink/graph/test/GatherSumApplyITCase.java | 8 +-
.../test/library/CommunityDetectionITCase.java | 4 +-
...ctedComponentsWithRandomisedEdgesITCase.java | 3 +-
.../test/library/LabelPropagationITCase.java | 8 +-
.../graph/test/library/PageRankITCase.java | 8 +-
.../graph/test/library/TriangleCountITCase.java | 6 +-
.../test/operations/GraphOperationsITCase.java | 1 -
24 files changed, 242 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
index bd08190..4189602 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
@@ -69,7 +69,7 @@ public class ConnectedComponents implements ProgramDescription {
}, env);
DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
- .run(new GSAConnectedComponents(maxIterations)).getVertices();
+ .run(new GSAConnectedComponents<Long, NullValue>(maxIterations));
// emit result
if (fileOutput) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index a56224d..e347bc5 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -153,8 +153,7 @@ public class MusicProfiles implements ProgramDescription {
public Long map(Tuple2<Long, Long> value) {
return value.f1;
}
- }).run(new LabelPropagation<String>(maxIterations))
- .getVertices();
+ }).run(new LabelPropagation<String, NullValue>(maxIterations));
if (fileOutput) {
verticesWithCommunity.writeAsCsv(communitiesOutputPath, "\n", "\t");
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
index ed0cf70..5a8e97a 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -27,6 +27,13 @@ import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.util.Collection;
+/**
+ * The base class for the third and last step of a {@link GatherSumApplyIteration}.
+ *
+ * @param <K> the vertex ID type
+ * @param <VV> the vertex value type
+ * @param <M> the input type (produced by the Sum phase)
+ */
@SuppressWarnings("serial")
public abstract class ApplyFunction<K, VV, M> implements Serializable {
@@ -51,6 +58,14 @@ public abstract class ApplyFunction<K, VV, M> implements Serializable {
//---------------------------------------------------------------------------------------------
+ /**
+ * This method is invoked once per superstep, after the {@link SumFunction}
+ * in a {@link GatherSumApplyIteration}.
+ * It updates the Vertex values.
+ *
+ * @param newValue the value computed during the current superstep.
+ * @param currentValue the current Vertex value.
+ */
public abstract void apply(M newValue, VV currentValue);
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
index 5a09a5a..563b20e 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
@@ -25,6 +25,13 @@ import org.apache.flink.types.Value;
import java.io.Serializable;
import java.util.Collection;
+/**
+ * The base class for the first step of a {@link GatherSumApplyIteration}.
+ *
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ * @param <M> the output type
+ */
@SuppressWarnings("serial")
public abstract class GatherFunction<VV, EV, M> implements Serializable {
@@ -49,6 +56,15 @@ public abstract class GatherFunction<VV, EV, M> implements Serializable {
//---------------------------------------------------------------------------------------------
+ /**
+ * This method is invoked once per superstep, for each {@link Neighbor} of each Vertex
+ * in the beginning of each superstep in a {@link GatherSumApplyIteration}.
+ * It needs to produce a partial value, which will be combined with other partial value
+ * in the next phase of the iteration.
+ *
+ * @param neighbor the input Neighbor. It provides access to the source Vertex and the Edge objects.
+ * @return a partial result to be combined in the Sum phase.
+ */
public abstract M gather(Neighbor<VV, EV> neighbor);
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
index 69baae4..f27e275 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
@@ -25,6 +25,13 @@ import org.apache.flink.types.Value;
import java.io.Serializable;
import java.util.Collection;
+/**
+ * The base class for the second step of a {@link GatherSumApplyIteration}.
+ *
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ * @param <M> the output type
+ */
@SuppressWarnings("serial")
public abstract class SumFunction<VV, EV, M> implements Serializable {
@@ -48,7 +55,16 @@ public abstract class SumFunction<VV, EV, M> implements Serializable {
}
//---------------------------------------------------------------------------------------------
-
+ /**
+ * This method is invoked once per superstep, after the {@link GatherFunction}
+ * in a {@link GatherSumApplyIteration}.
+ * It combines the partial values produced by {@link GatherFunction#gather(Neighbor)}
+ * in pairs, until a single value has been computed.
+ *
+ * @param arg0 the first partial value.
+ * @param arg1 the second partial value.
+ * @return the combined value.
+ */
public abstract M sum(M arg0, M arg1);
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
index 31488ee..0dd39fc 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
@@ -19,6 +19,8 @@
package org.apache.flink.graph.library;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
@@ -34,18 +36,21 @@ import java.util.TreeMap;
/**
* Community Detection Algorithm.
*
- * Initially, each vertex is assigned a tuple formed of its own id along with a score equal to 1.0, as value.
+ * This implementation expects Long Vertex values and labels. The Vertex values of the input Graph provide the initial label assignments.
+ *
+ * Initially, each vertex is assigned a tuple formed of its own initial value along with a score equal to 1.0.
* The vertices propagate their labels and max scores in iterations, each time adopting the label with the
* highest score from the list of received messages. The chosen label is afterwards re-scored using the fraction
* delta/the superstep number. Delta is passed as a parameter and has 0.5 as a default value.
*
* The algorithm converges when vertices no longer update their value or when the maximum number of iterations
* is reached.
+ *
+ * @param <K> the Vertex ID type
*
* @see <a href="http://arxiv.org/pdf/0808.2633.pdf">article explaining the algorithm in detail</a>
*/
-public class CommunityDetection implements
- GraphAlgorithm<Long, Long, Double, Graph<Long, Long, Double>> {
+public class CommunityDetection<K> implements GraphAlgorithm<K, Long, Double, Graph<K, Long, Double>> {
private Integer maxIterations;
@@ -58,20 +63,22 @@ public class CommunityDetection implements
}
@Override
- public Graph<Long, Long, Double> run(Graph<Long, Long, Double> graph) {
+ public Graph<K, Long, Double> run(Graph<K, Long, Double> graph) {
- Graph<Long, Long, Double> undirectedGraph = graph.getUndirected();
+ DataSet<Vertex<K, Tuple2<Long, Double>>> initializedVertices = graph.getVertices()
+ .map(new AddScoreToVertexValuesMapper<K>());
- Graph<Long, Tuple2<Long, Double>, Double> graphWithScoredVertices = undirectedGraph
- .mapVertices(new AddScoreToVertexValuesMapper());
+ Graph<K, Tuple2<Long, Double>, Double> graphWithScoredVertices =
+ Graph.fromDataSet(initializedVertices, graph.getEdges(), graph.getContext()).getUndirected();
- return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater(delta),
- new LabelMessenger(), maxIterations)
- .mapVertices(new RemoveScoreFromVertexValuesMapper());
+ return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater<K>(delta),
+ new LabelMessenger<K>(), maxIterations)
+ .mapVertices(new RemoveScoreFromVertexValuesMapper<K>());
}
@SuppressWarnings("serial")
- public static final class VertexLabelUpdater extends VertexUpdateFunction<Long, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+ public static final class VertexLabelUpdater<K> extends VertexUpdateFunction<
+ K, Tuple2<Long, Double>, Tuple2<Long, Double>> {
private Double delta;
@@ -80,7 +87,7 @@ public class CommunityDetection implements
}
@Override
- public void updateVertex(Vertex<Long, Tuple2<Long, Double>> vertex,
+ public void updateVertex(Vertex<K, Tuple2<Long, Double>> vertex,
MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception {
// we would like these two maps to be ordered
@@ -140,34 +147,36 @@ public class CommunityDetection implements
}
@SuppressWarnings("serial")
- public static final class LabelMessenger extends MessagingFunction<Long, Tuple2<Long, Double>,
+ public static final class LabelMessenger<K> extends MessagingFunction<K, Tuple2<Long, Double>,
Tuple2<Long, Double>, Double> {
@Override
- public void sendMessages(Vertex<Long, Tuple2<Long, Double>> vertex) throws Exception {
+ public void sendMessages(Vertex<K, Tuple2<Long, Double>> vertex) throws Exception {
- for(Edge<Long, Double> edge : getEdges()) {
+ for(Edge<K, Double> edge : getEdges()) {
sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertex.getValue().f0,
vertex.getValue().f1 * edge.getValue()));
}
-
}
}
@SuppressWarnings("serial")
- public static final class AddScoreToVertexValuesMapper implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Double>> {
+ @ForwardedFields("f0")
+ public static final class AddScoreToVertexValuesMapper<K> implements MapFunction<
+ Vertex<K, Long>, Vertex<K, Tuple2<Long, Double>>> {
- @Override
- public Tuple2<Long, Double> map(Vertex<Long, Long> vertex) throws Exception {
- return new Tuple2<Long, Double>(vertex.getValue(), 1.0);
+ public Vertex<K, Tuple2<Long, Double>> map(Vertex<K, Long> vertex) {
+ return new Vertex<K, Tuple2<Long, Double>>(
+ vertex.getId(), new Tuple2<Long, Double>(vertex.getValue(), 1.0));
}
}
@SuppressWarnings("serial")
- public static final class RemoveScoreFromVertexValuesMapper implements MapFunction<Vertex<Long, Tuple2<Long, Double>>, Long> {
+ public static final class RemoveScoreFromVertexValuesMapper<K> implements MapFunction<
+ Vertex<K, Tuple2<Long, Double>>, Long> {
@Override
- public Long map(Vertex<Long, Tuple2<Long, Double>> vertex) throws Exception {
+ public Long map(Vertex<K, Tuple2<Long, Double>> vertex) throws Exception {
return vertex.getValue().f0;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
index 871f315..ed853fe 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
@@ -18,27 +18,32 @@
package org.apache.flink.graph.library;
+import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
import org.apache.flink.types.NullValue;
/**
- * A vertex-centric implementation of the Connected components algorithm.
+ * A vertex-centric implementation of the Connected Components algorithm.
*
- * Initially, each vertex will have its own ID as a value(is its own component). The vertices propagate their
- * current component ID in iterations, each time adopting a new value from the received neighbor IDs,
+ * This implementation assumes that the vertices of the input Graph are initialized with unique, Long component IDs.
+ * The vertices propagate their current component ID in iterations, each time adopting a new value from the received neighbor IDs,
* provided that the value is less than the current minimum.
*
* The algorithm converges when vertices no longer update their value or when the maximum number of iterations
* is reached.
+ *
+ * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID.
+ *
+ * @see {@link org.apache.flink.graph.library.GSAConnectedComponents}
*/
@SuppressWarnings("serial")
-public class ConnectedComponents implements
- GraphAlgorithm<Long, Long, NullValue, Graph<Long, Long, NullValue>> {
+public class ConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> {
private Integer maxIterations;
@@ -47,21 +52,24 @@ public class ConnectedComponents implements
}
@Override
- public Graph<Long, Long, NullValue> run(Graph<Long, Long, NullValue> graph) throws Exception {
+ public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> graph) throws Exception {
- Graph<Long, Long, NullValue> undirectedGraph = graph.getUndirected();
+ Graph<K, Long, NullValue> undirectedGraph = graph.mapEdges(new NullValueEdgeMapper<K, EV>())
+ .getUndirected();
// initialize vertex values and run the Vertex Centric Iteration
- return undirectedGraph.runVertexCentricIteration(new CCUpdater(), new CCMessenger(), maxIterations);
+ return undirectedGraph.runVertexCentricIteration(
+ new CCUpdater<K>(), new CCMessenger<K>(), maxIterations)
+ .getVertices();
}
/**
* Updates the value of a vertex by picking the minimum neighbor ID out of all the incoming messages.
*/
- public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> {
+ public static final class CCUpdater<K> extends VertexUpdateFunction<K, Long, Long> {
@Override
- public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> messages) throws Exception {
+ public void updateVertex(Vertex<K, Long> vertex, MessageIterator<Long> messages) throws Exception {
long min = Long.MAX_VALUE;
for (long msg : messages) {
@@ -78,10 +86,10 @@ public class ConnectedComponents implements
/**
* Distributes the minimum ID associated with a given vertex among all the target vertices.
*/
- public static final class CCMessenger extends MessagingFunction<Long, Long, Long, NullValue> {
+ public static final class CCMessenger<K> extends MessagingFunction<K, Long, Long, NullValue> {
@Override
- public void sendMessages(Vertex<Long, Long> vertex) throws Exception {
+ public void sendMessages(Vertex<K, Long> vertex) throws Exception {
// send current minimum to neighbors
sendMessageToAllNeighbors(vertex.getValue());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
index f852f3e..77bc2cf 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
@@ -18,19 +18,25 @@
package org.apache.flink.graph.library;
+import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
import org.apache.flink.types.NullValue;
/**
* This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration.
+ * This implementation assumes that the vertices of the input Graph are initialized with unique, Long component IDs.
+ * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID.
+ *
+ * @see {@link org.apache.flink.graph.library.ConnectedComponents}
*/
-public class GSAConnectedComponents implements
- GraphAlgorithm<Long, Long, NullValue, Graph<Long, Long, NullValue>> {
+public class GSAConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> {
private Integer maxIterations;
@@ -39,13 +45,15 @@ public class GSAConnectedComponents implements
}
@Override
- public Graph<Long, Long, NullValue> run(Graph<Long, Long, NullValue> graph) throws Exception {
+ public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> graph) throws Exception {
- Graph<Long, Long, NullValue> undirectedGraph = graph.getUndirected();
+ Graph<K, Long, NullValue> undirectedGraph = graph.mapEdges(new NullValueEdgeMapper<K, EV>())
+ .getUndirected();
// initialize vertex values and run the Vertex Centric Iteration
- return undirectedGraph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(), new UpdateComponentId(),
- maxIterations);
+ return undirectedGraph.runGatherSumApplyIteration(
+ new GatherNeighborIds(), new SelectMinId(), new UpdateComponentId<K>(),
+ maxIterations).getVertices();
}
// --------------------------------------------------------------------------------------------
@@ -69,7 +77,7 @@ public class GSAConnectedComponents implements
};
@SuppressWarnings("serial")
- private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
+ private static final class UpdateComponentId<K> extends ApplyFunction<K, Long, Long> {
public void apply(Long summedValue, Long origValue) {
if (summedValue < origValue) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
index 6ce2ed6..df3e89a 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.Neighbor;
@@ -36,7 +37,7 @@ import org.apache.flink.graph.gsa.SumFunction;
*
* The implementation assumes that each page has at least one incoming and one outgoing link.
*/
-public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> {
+public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
private double beta;
private int maxIterations;
@@ -58,7 +59,7 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, Graph<K
}
@Override
- public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
+ public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
if (numberOfVertices == 0) {
numberOfVertices = network.numberOfVertices();
@@ -70,7 +71,8 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, Graph<K
.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
- new UpdateRanks<K>(beta, numberOfVertices), maxIterations);
+ new UpdateRanks<K>(beta, numberOfVertices), maxIterations)
+ .getVertices();
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
index 18bdd1d..5a76072 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
@@ -19,6 +19,7 @@
package org.apache.flink.graph.library;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
@@ -31,7 +32,7 @@ import org.apache.flink.graph.gsa.Neighbor;
* This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
*/
public class GSASingleSourceShortestPaths<K> implements
- GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> {
+ GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
private final K srcVertexId;
private final Integer maxIterations;
@@ -42,11 +43,12 @@ public class GSASingleSourceShortestPaths<K> implements
}
@Override
- public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
+ public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
.runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
- new UpdateDistance<K>(), maxIterations);
+ new UpdateDistance<K>(), maxIterations)
+ .getVertices();
}
@SuppressWarnings("serial")
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
index 3d4d902..76d170d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.ReduceNeighborsFunction;
@@ -46,62 +45,61 @@ import java.util.TreeMap;
*
* This implementation is non - iterative.
*
- * The algorithm takes an undirected, unweighted graph as input and outputs a DataSet of
- * Tuple1 which contains a single integer representing the number of triangles.
+ * The algorithm takes an undirected, unweighted graph as input and outputs a DataSet
+ * which contains a single integer representing the number of triangles.
*/
-public class GSATriangleCount implements
- GraphAlgorithm<Long, NullValue, NullValue, DataSet<Tuple1<Integer>>> {
+public class GSATriangleCount<K extends Comparable<K>, VV, EV> implements
+ GraphAlgorithm<K, VV, EV, DataSet<Integer>> {
@SuppressWarnings("serial")
@Override
- public DataSet<Tuple1<Integer>> run(Graph<Long, NullValue, NullValue> input) throws Exception {
+ public DataSet<Integer> run(Graph<K, VV, EV> input) throws Exception {
ExecutionEnvironment env = input.getContext();
// order the edges so that src is always higher than trg
- DataSet<Edge<Long, NullValue>> edges = input.getEdges()
- .map(new OrderEdges()).distinct();
+ DataSet<Edge<K, NullValue>> edges = input.getEdges().map(new OrderEdges<K, EV>()).distinct();
- Graph<Long, TreeMap<Long, Integer>, NullValue> graph = Graph.fromDataSet(edges,
- new VertexInitializer(), env);
+ Graph<K, TreeMap<K, Integer>, NullValue> graph = Graph.fromDataSet(edges,
+ new VertexInitializer<K>(), env);
// select neighbors with ids higher than the current vertex id
// Gather: a no-op in this case
// Sum: create the set of neighbors
- DataSet<Tuple2<Long, TreeMap<Long, Integer>>> higherIdNeighbors =
- graph.reduceOnNeighbors(new GatherHigherIdNeighbors(), EdgeDirection.IN);
+ DataSet<Tuple2<K, TreeMap<K, Integer>>> higherIdNeighbors =
+ graph.reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
- Graph<Long, TreeMap<Long, Integer>, NullValue> graphWithReinitializedVertexValues =
- graph.mapVertices(new VertexInitializerEmptyTreeMap());
+ Graph<K, TreeMap<K, Integer>, NullValue> graphWithReinitializedVertexValues =
+ graph.mapVertices(new VertexInitializerEmptyTreeMap<K>());
// Apply: attach the computed values to the vertices
// joinWithVertices to update the node values
- DataSet<Vertex<Long, TreeMap<Long, Integer>>> verticesWithHigherIdNeighbors =
- graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new AttachValues()).getVertices();
+ DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithHigherIdNeighbors =
+ graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new AttachValues<K>()).getVertices();
- Graph<Long, TreeMap<Long,Integer>, NullValue> graphWithNeighbors = Graph.fromDataSet(verticesWithHigherIdNeighbors,
+ Graph<K, TreeMap<K,Integer>, NullValue> graphWithNeighbors = Graph.fromDataSet(verticesWithHigherIdNeighbors,
edges, env);
// propagate each received value to neighbors with higher id
// Gather: a no-op in this case
// Sum: propagate values
- DataSet<Tuple2<Long, TreeMap<Long, Integer>>> propagatedValues = graphWithNeighbors
- .reduceOnNeighbors(new GatherHigherIdNeighbors(), EdgeDirection.IN);
+ DataSet<Tuple2<K, TreeMap<K, Integer>>> propagatedValues = graphWithNeighbors
+ .reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
// Apply: attach propagated values to vertices
- DataSet<Vertex<Long, TreeMap<Long, Integer>>> verticesWithPropagatedValues =
- graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new AttachValues()).getVertices();
+ DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithPropagatedValues =
+ graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new AttachValues<K>()).getVertices();
- Graph<Long, TreeMap<Long, Integer>, NullValue> graphWithPropagatedNeighbors =
+ Graph<K, TreeMap<K, Integer>, NullValue> graphWithPropagatedNeighbors =
Graph.fromDataSet(verticesWithPropagatedValues, graphWithNeighbors.getEdges(), env);
// Scatter: compute the number of triangles
- DataSet<Tuple1<Integer>> numberOfTriangles = graphWithPropagatedNeighbors.getTriplets()
- .map(new ComputeTriangles()).reduce(new ReduceFunction<Tuple1<Integer>>() {
+ DataSet<Integer> numberOfTriangles = graphWithPropagatedNeighbors.getTriplets()
+ .map(new ComputeTriangles<K>()).reduce(new ReduceFunction<Integer>() {
@Override
- public Tuple1<Integer> reduce(Tuple1<Integer> firstTuple, Tuple1<Integer> secondTuple) throws Exception {
- return new Tuple1<Integer>(firstTuple.f0 + secondTuple.f0);
+ public Integer reduce(Integer first, Integer second) throws Exception {
+ return first + second;
}
});
@@ -109,24 +107,25 @@ public class GSATriangleCount implements
}
@SuppressWarnings("serial")
- private static final class OrderEdges implements MapFunction<Edge<Long, NullValue>, Edge<Long, NullValue>> {
+ private static final class OrderEdges<K extends Comparable<K>, EV> implements
+ MapFunction<Edge<K, EV>, Edge<K, NullValue>> {
@Override
- public Edge<Long, NullValue> map(Edge<Long, NullValue> edge) throws Exception {
- if (edge.getSource() < edge.getTarget()) {
- return new Edge<Long, NullValue>(edge.getTarget(), edge.getSource(), NullValue.getInstance());
+ public Edge<K, NullValue> map(Edge<K, EV> edge) throws Exception {
+ if (edge.getSource().compareTo(edge.getTarget()) < 0) {
+ return new Edge<K, NullValue>(edge.getTarget(), edge.getSource(), NullValue.getInstance());
} else {
- return edge;
+ return new Edge<K, NullValue>(edge.getSource(), edge.getTarget(), NullValue.getInstance());
}
}
}
@SuppressWarnings("serial")
- private static final class VertexInitializer implements MapFunction<Long, TreeMap<Long, Integer>> {
+ private static final class VertexInitializer<K> implements MapFunction<K, TreeMap<K, Integer>> {
@Override
- public TreeMap<Long, Integer> map(Long value) throws Exception {
- TreeMap<Long, Integer> neighbors = new TreeMap<Long, Integer>();
+ public TreeMap<K, Integer> map(K value) throws Exception {
+ TreeMap<K, Integer> neighbors = new TreeMap<K, Integer>();
neighbors.put(value, 1);
return neighbors;
@@ -134,31 +133,32 @@ public class GSATriangleCount implements
}
@SuppressWarnings("serial")
- private static final class VertexInitializerEmptyTreeMap implements
- MapFunction<Vertex<Long, TreeMap<Long, Integer>>, TreeMap<Long, Integer>> {
+ private static final class VertexInitializerEmptyTreeMap<K> implements
+ MapFunction<Vertex<K, TreeMap<K, Integer>>, TreeMap<K, Integer>> {
@Override
- public TreeMap<Long, Integer> map(Vertex<Long, TreeMap<Long, Integer>> vertex) throws Exception {
- return new TreeMap<Long, Integer>();
+ public TreeMap<K, Integer> map(Vertex<K, TreeMap<K, Integer>> vertex) throws Exception {
+ return new TreeMap<K, Integer>();
}
}
@SuppressWarnings("serial")
- private static final class AttachValues implements MapFunction<Tuple2<TreeMap<Long, Integer>,
- TreeMap<Long, Integer>>, TreeMap<Long, Integer>> {
+ private static final class AttachValues<K> implements MapFunction<Tuple2<TreeMap<K, Integer>,
+ TreeMap<K, Integer>>, TreeMap<K, Integer>> {
@Override
- public TreeMap<Long, Integer> map(Tuple2<TreeMap<Long, Integer>, TreeMap<Long, Integer>> tuple2) throws Exception {
+ public TreeMap<K, Integer> map(Tuple2<TreeMap<K, Integer>, TreeMap<K, Integer>> tuple2) throws Exception {
return tuple2.f1;
}
}
@SuppressWarnings("serial")
- private static final class GatherHigherIdNeighbors implements ReduceNeighborsFunction<TreeMap<Long,Integer>> {
+ private static final class GatherHigherIdNeighbors<K> implements
+ ReduceNeighborsFunction<TreeMap<K,Integer>> {
@Override
- public TreeMap<Long,Integer> reduceNeighbors(TreeMap<Long,Integer> first, TreeMap<Long,Integer> second) {
- for (Long key : second.keySet()) {
+ public TreeMap<K, Integer> reduceNeighbors(TreeMap<K,Integer> first, TreeMap<K,Integer> second) {
+ for (K key : second.keySet()) {
Integer value = first.get(key);
if (value != null) {
first.put(key, value + second.get(key));
@@ -171,20 +171,20 @@ public class GSATriangleCount implements
}
@SuppressWarnings("serial")
- private static final class ComputeTriangles implements MapFunction<Triplet<Long, TreeMap<Long, Integer>, NullValue>,
- Tuple1<Integer>> {
+ private static final class ComputeTriangles<K> implements MapFunction<Triplet<K, TreeMap<K, Integer>, NullValue>,
+ Integer> {
@Override
- public Tuple1<Integer> map(Triplet<Long, TreeMap<Long, Integer>, NullValue> triplet) throws Exception {
+ public Integer map(Triplet<K, TreeMap<K, Integer>, NullValue> triplet) throws Exception {
- Vertex<Long, TreeMap<Long, Integer>> srcVertex = triplet.getSrcVertex();
- Vertex<Long, TreeMap<Long, Integer>> trgVertex = triplet.getTrgVertex();
+ Vertex<K, TreeMap<K, Integer>> srcVertex = triplet.getSrcVertex();
+ Vertex<K, TreeMap<K, Integer>> trgVertex = triplet.getTrgVertex();
int triangles = 0;
if(trgVertex.getValue().get(srcVertex.getId()) != null) {
- triangles=trgVertex.getValue().get(srcVertex.getId());
+ triangles = trgVertex.getValue().get(srcVertex.getId());
}
- return new Tuple1<Integer>(triangles);
+ return triangles;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index 2dd2180..82dfee7 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -18,12 +18,14 @@
package org.apache.flink.graph.library;
+import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
import org.apache.flink.types.NullValue;
import java.util.HashMap;
@@ -42,8 +44,8 @@ import java.util.Map.Entry;
*/
@SuppressWarnings("serial")
-public class LabelPropagation<K extends Comparable<K>> implements
- GraphAlgorithm<K, Long, NullValue, Graph<K, Long, NullValue>> {
+public class LabelPropagation<K extends Comparable<K>, EV> implements GraphAlgorithm<K, Long, EV,
+ DataSet<Vertex<K, Long>>> {
private final int maxIterations;
@@ -52,12 +54,12 @@ public class LabelPropagation<K extends Comparable<K>> implements
}
@Override
- public Graph<K, Long, NullValue> run(Graph<K, Long, NullValue> input) {
+ public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> input) {
// iteratively adopt the most frequent label among the neighbors
// of each vertex
- return input.runVertexCentricIteration(new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(),
- maxIterations);
+ return input.mapEdges(new NullValueEdgeMapper<K, EV>()).runVertexCentricIteration(new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(),
+ maxIterations).getVertices();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index 47f7acd..8193dba 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -37,8 +37,7 @@ import org.apache.flink.graph.spargel.VertexUpdateFunction;
*
* The implementation assumes that each page has at least one incoming and one outgoing link.
*/
-public class PageRank<K> implements
- GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> {
+public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
private double beta;
private int maxIterations;
@@ -66,7 +65,7 @@ public class PageRank<K> implements
}
@Override
- public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
+ public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
if (numberOfVertices == 0) {
numberOfVertices = network.numberOfVertices();
@@ -78,7 +77,8 @@ public class PageRank<K> implements
.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
return networkWithWeights.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
- new RankMessenger<K>(numberOfVertices), maxIterations);
+ new RankMessenger<K>(numberOfVertices), maxIterations)
+ .getVertices();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 1911f73..60c4c17 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -19,6 +19,7 @@
package org.apache.flink.graph.library;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
@@ -31,8 +32,7 @@ import org.apache.flink.graph.spargel.VertexUpdateFunction;
* This is an implementation of the Single-Source-Shortest Paths algorithm, using a vertex-centric iteration.
*/
@SuppressWarnings("serial")
-public class SingleSourceShortestPaths<K> implements
- GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> {
+public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
private final K srcVertexId;
private final Integer maxIterations;
@@ -43,11 +43,11 @@ public class SingleSourceShortestPaths<K> implements
}
@Override
- public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
+ public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
.runVertexCentricIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
- maxIterations);
+ maxIterations).getVertices();
}
public static final class InitVerticesMapper<K> implements MapFunction<Vertex<K, Double>, Double> {
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
new file mode 100644
index 0000000..2bd4719
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.types.NullValue;
+
+public class NullValueEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, NullValue> {
+
+ private static final long serialVersionUID = 1L;
+
+ public NullValue map(Edge<K, EV> edge) {
+ return NullValue.getInstance();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
index 7a66639..2ad203f 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
@@ -56,7 +56,6 @@ public class GSACompilerTest extends CompilerTestBase {
env.setParallelism(DEFAULT_PARALLELISM);
// compose test program
{
- @SuppressWarnings("unchecked")
DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<Long, Long, NullValue>(
1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>());
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
index 0a7b1c7..ced7508 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
@@ -71,7 +71,6 @@ public class GSATranslationTest {
// ------------ construct the test program ------------------
{
- @SuppressWarnings("unchecked")
DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<Long, Long, NullValue>(
1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>());
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index 4b381b6..0213f02 100755
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -55,8 +55,8 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
new InitMapperCC(), env);
- List<Vertex<Long, Long>> result = inputGraph.run(new GSAConnectedComponents(16))
- .getVertices().collect();
+ List<Vertex<Long, Long>> result = inputGraph.run(
+ new GSAConnectedComponents<Long, NullValue>(16)).collect();
expectedResult = "1,1\n" +
"2,1\n" +
@@ -78,8 +78,8 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
SingleSourceShortestPathsData.getDefaultEdgeDataSet(env),
new InitMapperSSSP(), env);
- List<Vertex<Long, Double>> result = inputGraph.run(new GSASingleSourceShortestPaths<Long>(1l, 16))
- .getVertices().collect();
+ List<Vertex<Long, Double>> result = inputGraph.run(
+ new GSASingleSourceShortestPaths<Long>(1l, 16)).collect();
expectedResult = "1,0.0\n" +
"2,12.0\n" +
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
index 104996e..421eaa9 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
@@ -50,7 +50,7 @@ public class CommunityDetectionITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
CommunityDetectionData.getSimpleEdgeDataSet(env), new InitLabels(), env);
- List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection(1, CommunityDetectionData.DELTA))
+ List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection<Long>(1, CommunityDetectionData.DELTA))
.getVertices().collect();
expected = CommunityDetectionData.COMMUNITIES_SINGLE_ITERATION;
@@ -66,7 +66,7 @@ public class CommunityDetectionITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
CommunityDetectionData.getTieEdgeDataSet(env), new InitLabels(), env);
- List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection(1, CommunityDetectionData.DELTA))
+ List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection<Long>(1, CommunityDetectionData.DELTA))
.getVertices().collect();
expected = CommunityDetectionData.COMMUNITIES_WITH_TIE;
compareResultAsTuples(result, expected);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
index ef4b467..9eb7387 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
@@ -60,8 +60,7 @@ public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTes
Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
- DataSet<Vertex<Long, Long>> result = graph
- .run(new ConnectedComponents(100)).getVertices();
+ DataSet<Vertex<Long, Long>> result = graph.run(new ConnectedComponents<Long, NullValue>(100));
result.writeAsCsv(resultPath, "\n", " ");
env.execute();
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
index da36ef6..8785b0d 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
@@ -51,8 +51,8 @@ public class LabelPropagationITCase extends MultipleProgramsTestBase {
LabelPropagationData.getDefaultVertexSet(env),
LabelPropagationData.getDefaultEdgeDataSet(env), env);
- List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long>(1))
- .getVertices().collect();
+ List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long, NullValue>(1))
+ .collect();
expectedResult = LabelPropagationData.LABELS_AFTER_1_ITERATION;
compareResultAsTuples(result, expectedResult);
@@ -69,8 +69,8 @@ public class LabelPropagationITCase extends MultipleProgramsTestBase {
LabelPropagationData.getTieVertexSet(env),
LabelPropagationData.getTieEdgeDataSet(env), env);
- List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long>(1))
- .getVertices().collect();
+ List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long, NullValue>(1))
+ .collect();
expectedResult = LabelPropagationData.LABELS_WITH_TIE;
compareResultAsTuples(result, expectedResult);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
index cc0327f..94c7713 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
@@ -51,7 +51,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3))
- .getVertices().collect();
+ .collect();
compareWithDelta(result, expectedResult, 0.01);
}
@@ -64,7 +64,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3))
- .getVertices().collect();
+ .collect();
compareWithDelta(result, expectedResult, 0.01);
}
@@ -77,7 +77,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 5, 3))
- .getVertices().collect();
+ .collect();
compareWithDelta(result, expectedResult, 0.01);
}
@@ -90,7 +90,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 5, 3))
- .getVertices().collect();
+ .collect();
compareWithDelta(result, expectedResult, 0.01);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
index 047bbf7..1d9ab9f 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
@@ -19,12 +19,12 @@
package org.apache.flink.graph.test.library;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.example.utils.TriangleCountData;
import org.apache.flink.graph.library.GSATriangleCount;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.NullValue;
+import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -48,9 +48,9 @@ public class TriangleCountITCase extends MultipleProgramsTestBase {
Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
env).getUndirected();
- List<Tuple1<Integer>> numberOfTriangles = graph.run(new GSATriangleCount()).collect();
+ List<Integer> numberOfTriangles = graph.run(new GSATriangleCount<Long, NullValue, NullValue>()).collect();
expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES;
- compareResultAsTuples(numberOfTriangles, expectedResult);
+ Assert.assertEquals(numberOfTriangles.get(0).intValue(), Integer.parseInt(expectedResult));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
index a06f881..ffc9da9 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
@@ -326,7 +326,6 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
compareResultAsTuples(result, expectedResult);
}
- @SuppressWarnings("unchecked")
@Test
public void testDifference2() throws Exception {
/*