You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/04/10 10:29:25 UTC
[1/4] flink git commit: [FLINK-1694] [gelly] added
IterationConfiguration as a way to configure a VertexCentricIteration
Repository: flink
Updated Branches:
refs/heads/master e45f13f53 -> c518df944
[FLINK-1694] [gelly] added IterationConfiguration as a way to configure a VertexCentricIteration
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e98bd853
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e98bd853
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e98bd853
Branch: refs/heads/master
Commit: e98bd85384946f9c6834542a39e0eb63c2c95f15
Parents: e45f13f
Author: vasia <va...@gmail.com>
Authored: Sun Mar 29 23:39:08 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Fri Apr 10 10:53:02 2015 +0300
----------------------------------------------------------------------
docs/gelly_guide.md | 79 ++++++-
.../main/java/org/apache/flink/graph/Graph.java | 45 +++-
.../flink/graph/library/LabelPropagation.java | 6 +-
.../apache/flink/graph/library/PageRank.java | 6 +-
.../graph/library/SimpleCommunityDetection.java | 8 +-
.../library/SingleSourceShortestPaths.java | 10 +-
.../graph/spargel/IterationConfiguration.java | 192 +++++++++++++++++
.../graph/spargel/VertexCentricIteration.java | 168 ++++-----------
.../graph/spargel/VertexUpdateFunction.java | 2 +-
.../test/CollectionModeSuperstepITCase.java | 7 +-
.../test/VertexCentricConfigurationITCase.java | 213 +++++++++++++++++++
.../VertexCentricConnectedComponentsITCase.java | 4 +-
12 files changed, 565 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/docs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md
index 9cceafc..203cffa 100644
--- a/docs/gelly_guide.md
+++ b/docs/gelly_guide.md
@@ -347,29 +347,86 @@ Vertex-centric Iterations
Gelly wraps Flink's [Spargel API](spargel_guide.html) to provide methods for vertex-centric iterations.
Like in Spargel, the user only needs to implement two functions: a `VertexUpdateFunction`, which defines how a vertex will update its value
based on the received messages and a `MessagingFunction`, which allows a vertex to send out messages for the next superstep.
-These functions are given as parameters to Gelly's `createVertexCentricIteration`, which returns a `VertexCentricIteration`.
-The user can configure this iteration (set the name, the parallelism, aggregators, etc.) and then run the computation, using the `runVertexCentricIteration` method:
+These functions and the maximum number of iterations to run are given as parameters to Gelly's `runVertexCentricIteration`.
+This method will execute the vertex-centric iteration on the input Graph and return a new Graph, with updated vertex values:
{% highlight java %}
Graph<Long, Double, Double> graph = ...
-// create the vertex-centric iteration
-VertexCentricIteration<Long, Double, Double, Double> iteration =
- graph.createVertexCentricIteration(
+// run Single-Source-Shortest-Paths vertex-centric iteration
+Graph<Long, Double, Double> result =
+ graph.runVertexCentricIteration(
new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations);
+// user-defined functions
+public static final class VertexDistanceUpdater {...}
+public static final class MinDistanceMessenger {...}
+
+{% endhighlight %}
+
+### Configuring a Vertex-Centric Iteration
+A vertex-centric iteration can be configured using an `IterationConfiguration` object.
+Currently, the following parameters can be specified:
+
+* <strong>Name</strong>: The name for the vertex-centric iteration. The name is displayed in logs and messages
+and can be specified using the `setName()` method.
+
+* <strong>Parallelism</strong>: The parallelism for the iteration. It can be set using the `setParallelism()` method.
+
+* <strong>Solution set in unmanaged memory</strong>: Defines whether the solution set is kept in managed memory (Flink's internal way of keeping object in serialized form) or as a simple object map. By default, the solution set runs in managed memory. This property can be set using the `setSolutionSetUnmanagedMemory()` method.
+
+* <strong>Aggregators</strong>: Iteration aggregators can be registered using the `registerAggregator()` method. An iteration aggregator combines
+all aggregates globally once per superstep and makes them available in the next superstep. Registered aggregators can be accessed inside the user-defined `VertexUpdateFunction` and `MessagingFunction`.
+
+* <strong>Broadcast Variables</strong>: DataSets can be added as [Broadcast Variables](programming_guide.html#broadcast-variables) to the `VertexUpdateFunction` and `MessagingFunction`, using the `addBroadcastSetForUpdateFunction()` and `addBroadcastSetForMessagingFunction()` methods, respectively.
+
+{% highlight java %}
+
+Graph<Long, Double, Double> graph = ...
+
+// configure the iteration
+IterationConfiguration parameters = new IterationConfiguration();
+
// set the iteration name
-iteration.setName("Single Source Shortest Paths");
+parameters.setName("Gelly Iteration");
// set the parallelism
-iteration.setParallelism(16);
+parameters.setParallelism(16);
+
+// register an aggregator
+parameters.registerAggregator("sumAggregator", new LongSumAggregator());
-// run the computation
-graph.runVertexCentricIteration(iteration);
+// run the vertex-centric iteration, also passing the configuration parameters
+Graph<Long, Double, Double> result =
+ graph.runVertexCentricIteration(
+ new VertexUpdater(), new Messenger(), maxIterations, parameters);
// user-defined functions
-public static final class VertexDistanceUpdater {...}
-public static final class MinDistanceMessenger {...}
+public static final class VertexUpdater extends VertexUpdateFunction {
+
+ LongSumAggregator aggregator = new LongSumAggregator();
+
+ public void preSuperstep() {
+
+ // retrieve the Aggregator
+ aggregator = getIterationAggregator("sumAggregator");
+ }
+
+
+ public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator inMessages) {
+
+ //do some computation
+ Long partialValue = ...
+
+ // aggregate the partial value
+ aggregator.aggregate(partialValue);
+
+ // update the vertex value
+ setNewVertexValue(...);
+ }
+}
+
+public static final class Messenger extends MessagingFunction {...}
{% endhighlight %}
http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index a73beaf..8280ba9 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -47,6 +47,7 @@ import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.graph.spargel.IterationConfiguration;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
@@ -1149,30 +1150,52 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
}
/**
- * Create a Vertex-Centric iteration on the graph.
- *
+ * Runs a Vertex-Centric iteration on the graph.
+ * No configuration options are provided.
+
* @param vertexUpdateFunction the vertex update function
* @param messagingFunction the messaging function
* @param maximumNumberOfIterations maximum number of iterations to perform
- * @return
+ *
+ * @return the updated Graph after the vertex-centric iteration has converged or
+ * after maximumNumberOfIterations.
*/
- public <M> VertexCentricIteration<K, VV, M, EV> createVertexCentricIteration(
+ public <M> Graph<K, VV, EV> runVertexCentricIteration(
VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
MessagingFunction<K, VV, M, EV> messagingFunction,
int maximumNumberOfIterations) {
- return VertexCentricIteration.withEdges(edges, vertexUpdateFunction,
- messagingFunction, maximumNumberOfIterations);
+
+ VertexCentricIteration<K, VV, M, EV> iteration = VertexCentricIteration.withEdges(
+ edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations);
+
+ DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
+
+ return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
}
-
+
/**
- * Runs a Vertex-Centric iteration on the graph.
+ * Runs a Vertex-Centric iteration on the graph with configuration options.
+ *
+ * @param vertexUpdateFunction the vertex update function
+ * @param messagingFunction the messaging function
+ * @param maximumNumberOfIterations maximum number of iterations to perform
+ * @param parameters the iteration configuration parameters
*
- * @param iteration the Vertex-Centric iteration to run
- * @return
+ * @return the updated Graph after the vertex-centric iteration has converged or
+ * after maximumNumberOfIterations.
*/
public <M> Graph<K, VV, EV> runVertexCentricIteration(
- VertexCentricIteration<K, VV, M, EV> iteration) {
+ VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
+ MessagingFunction<K, VV, M, EV> messagingFunction,
+ int maximumNumberOfIterations, IterationConfiguration parameters) {
+
+ VertexCentricIteration<K, VV, M, EV> iteration = VertexCentricIteration.withEdges(
+ edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations);
+
+ iteration.configure(parameters);
+
DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
+
return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index 33a04e7..ff6fe85 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -22,7 +22,6 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.types.NullValue;
@@ -56,9 +55,8 @@ public class LabelPropagation<K extends Comparable<K> & Serializable>
// iteratively adopt the most frequent label among the neighbors
// of each vertex
- VertexCentricIteration<K, Long, Long, NullValue> iteration = input.createVertexCentricIteration(
- new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(), maxIterations);
- return input.runVertexCentricIteration(iteration);
+ return input.runVertexCentricIteration(new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(),
+ maxIterations);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index 00ae204..48c9a51 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -25,7 +25,6 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
public class PageRank<K extends Comparable<K> & Serializable> implements
@@ -43,11 +42,8 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
final long numberOfVertices = network.numberOfVertices();
-
- VertexCentricIteration<K, Double, Double, Double> iteration = network.createVertexCentricIteration(
- new VertexRankUpdater<K>(beta, numberOfVertices), new RankMessenger<K>(numberOfVertices),
+ return network.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices), new RankMessenger<K>(numberOfVertices),
maxIterations);
- return network.runVertexCentricIteration(iteration);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
index fb32781..e3d3e1c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
@@ -26,7 +26,6 @@ import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
import java.util.Map;
@@ -65,11 +64,8 @@ public class SimpleCommunityDetection implements GraphAlgorithm<Long, Long, Doub
Graph<Long, Tuple2<Long, Double>, Double> graphWithScoredVertices = undirectedGraph
.mapVertices(new AddScoreToVertexValuesMapper());
- VertexCentricIteration<Long, Tuple2<Long, Double>, Tuple2<Long, Double>, Double>
- iteration = graphWithScoredVertices.createVertexCentricIteration(new VertexLabelUpdater(delta),
- new LabelMessenger(), maxIterations);
-
- return graphWithScoredVertices.runVertexCentricIteration(iteration)
+ return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater(delta),
+ new LabelMessenger(), maxIterations)
.mapVertices(new RemoveScoreFromVertexValuesMapper());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 3e9a29d..262b2c5 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -25,7 +25,6 @@ import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
import java.io.Serializable;
@@ -45,12 +44,9 @@ public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable>
@Override
public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
- Graph<K, Double, Double> mappedInput = input.mapVertices(new InitVerticesMapper<K>(srcVertexId));
-
- VertexCentricIteration<K, Double, Double, Double> iteration = mappedInput.createVertexCentricIteration(
- new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(), maxIterations);
-
- return mappedInput.runVertexCentricIteration(iteration);
+ return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
+ .runVertexCentricIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
+ maxIterations);
}
public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/IterationConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/IterationConfiguration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/IterationConfiguration.java
new file mode 100644
index 0000000..f161d8d
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/IterationConfiguration.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ * This class is used to configure a vertex-centric iteration.
+ *
+ * An IterationConfiguration object can be used to set the iteration name and
+ * degree of parallelism, to register aggregators and use broadcast sets in
+ * the {@link VertexUpdateFunction} and {@link MessagingFunction}.
+ *
+ * The IterationConfiguration object is passed as an argument to
+ * {@link org.apache.flink.graph.Graph#runVertexCentricIteration(
+ * VertexUpdateFunction, MessagingFunction, int, IterationConfiguration)}.
+ *
+ */
+public class IterationConfiguration {
+
+ /** the iteration name **/
+ private String name;
+
+ /** the iteration parallelism **/
+ private int parallelism = -1;
+
+ /** the iteration aggregators **/
+ private Map<String, Aggregator<?>> aggregators = new HashMap<String, Aggregator<?>>();
+
+ /** the broadcast variables for the update function **/
+ private List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>();
+
+ /** the broadcast variables for the messaging function **/
+ private List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>();
+
+ /** flag that defines whether the solution set is kept in managed memory **/
+ private boolean unmanagedSolutionSet = false;
+
+ public IterationConfiguration() {}
+
+
+ /**
+ * Sets the name for the vertex-centric iteration. The name is displayed in logs and messages.
+ *
+ * @param name The name for the iteration.
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Gets the name of the vertex-centric iteration.
+ * @param defaultName
+ *
+ * @return The name of the iteration.
+ */
+ public String getName(String defaultName) {
+ if (name != null) {
+ return name;
+ }
+ else {
+ return defaultName;
+ }
+ }
+
+ /**
+ * Sets the parallelism for the iteration.
+ *
+ * @param parallelism The parallelism.
+ */
+ public void setParallelism(int parallelism) {
+ Validate.isTrue(parallelism > 0 || parallelism == -1, "The parallelism must be positive, or -1 (use default).");
+ this.parallelism = parallelism;
+ }
+
+ /**
+ * Gets the iteration's parallelism.
+ *
+ * @return The iterations parallelism, or -1, if not set.
+ */
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ /**
+ * Defines whether the solution set is kept in managed memory (Flink's internal way of keeping object
+ * in serialized form) or as a simple object map.
+ * By default, the solution set runs in managed memory.
+ *
+ * @param unmanaged True, to keep the solution set in unmanaged memory, false otherwise.
+ */
+ public void setSolutionSetUnmanagedMemory(boolean unmanaged) {
+ this.unmanagedSolutionSet = unmanaged;
+ }
+
+ /**
+ * Gets whether the solution set is kept in managed memory (Flink's internal way of keeping object
+ * in serialized form) or as a simple object map.
+ * By default, the solution set runs in managed memory.
+ *
+ * @return True, if the solution set is in unmanaged memory, false otherwise.
+ */
+ public boolean isSolutionSetUnmanagedMemory() {
+ return this.unmanagedSolutionSet;
+ }
+
+ /**
+ * Registers a new aggregator. Aggregators registered here are available during the execution of the vertex updates
+ * via {@link VertexUpdateFunction#getIterationAggregator(String)} and
+ * {@link VertexUpdateFunction#getPreviousIterationAggregate(String)}.
+ *
+ * @param name The name of the aggregator, used to retrieve it and its aggregates during execution.
+ * @param aggregator The aggregator.
+ */
+ public void registerAggregator(String name, Aggregator<?> aggregator) {
+ this.aggregators.put(name, aggregator);
+ }
+
+ /**
+ * Adds a data set as a broadcast set to the messaging function.
+ *
+ * @param name The name under which the broadcast data is available in the messaging function.
+ * @param data The data set to be broadcasted.
+ */
+ public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
+ this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
+ }
+
+ /**
+ * Adds a data set as a broadcast set to the vertex update function.
+ *
+ * @param name The name under which the broadcast data is available in the vertex update function.
+ * @param data The data set to be broadcasted.
+ */
+ public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
+ this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
+ }
+
+ /**
+ * Gets the set of aggregators that are registered for this vertex-centric iteration.
+ *
+ * @return a Map of the registered aggregators, where the key is the aggregator name
+ * and the value is the Aggregator object
+ */
+ public Map<String, Aggregator<?>> getAggregators() {
+ return this.aggregators;
+ }
+
+ /**
+ * Get the broadcast variables of the VertexUpdateFunction.
+ *
+ * @return a List of Tuple2, where the first field is the broadcast variable name
+ * and the second field is the broadcast DataSet.
+ */
+ public List<Tuple2<String, DataSet<?>>> getUpdateBcastVars() {
+ return this.bcVarsUpdate;
+ }
+
+ /**
+ * Get the broadcast variables of the MessagingFunction.
+ *
+ * @return a List of Tuple2, where the first field is the broadcast variable name
+ * and the second field is the broadcast DataSet.
+ */
+ public List<Tuple2<String, DataSet<?>>> getMessagingBcastVars() {
+ return this.bcVarsMessaging;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
index c54ee0c..ca66521 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -19,10 +19,7 @@
package org.apache.flink.graph.spargel;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.Validate;
@@ -82,23 +79,13 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
private final DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue;
- private final Map<String, Aggregator<?>> aggregators;
-
private final int maximumNumberOfIterations;
- private final List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>(4);
-
- private final List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>(4);
-
private final TypeInformation<Message> messageType;
private DataSet<Vertex<VertexKey, VertexValue>> initialVertices;
-
- private String name;
-
- private int parallelism = -1;
-
- private boolean unmanagedSolutionSet;
+
+ private IterationConfiguration configuration;
// ----------------------------------------------------------------------------------
@@ -115,8 +102,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
this.updateFunction = uf;
this.messagingFunction = mf;
this.edgesWithValue = edgesWithValue;
- this.maximumNumberOfIterations = maximumNumberOfIterations;
- this.aggregators = new HashMap<String, Aggregator<?>>();
+ this.maximumNumberOfIterations = maximumNumberOfIterations;
this.messageType = getMessageType(mf);
}
@@ -124,97 +110,6 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), 2, null, null);
}
- /**
- * Registers a new aggregator. Aggregators registered here are available during the execution of the vertex updates
- * via {@link VertexUpdateFunction#getIterationAggregator(String)} and
- * {@link VertexUpdateFunction#getPreviousIterationAggregate(String)}.
- *
- * @param name The name of the aggregator, used to retrieve it and its aggregates during execution.
- * @param aggregator The aggregator.
- */
- public void registerAggregator(String name, Aggregator<?> aggregator) {
- this.aggregators.put(name, aggregator);
- }
-
- /**
- * Adds a data set as a broadcast set to the messaging function.
- *
- * @param name The name under which the broadcast data is available in the messaging function.
- * @param data The data set to be broadcasted.
- */
- public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
- this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
- }
-
- /**
- * Adds a data set as a broadcast set to the vertex update function.
- *
- * @param name The name under which the broadcast data is available in the vertex update function.
- * @param data The data set to be broadcasted.
- */
- public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
- this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
- }
-
- /**
- * Sets the name for the vertex-centric iteration. The name is displayed in logs and messages.
- *
- * @param name The name for the iteration.
- */
- public void setName(String name) {
- this.name = name;
- }
-
- /**
- * Gets the name from this vertex-centric iteration.
- *
- * @return The name of the iteration.
- */
- public String getName() {
- return name;
- }
-
- /**
- * Sets the parallelism for the iteration.
- *
- * @param parallelism The parallelism.
- */
- public void setParallelism(int parallelism) {
- Validate.isTrue(parallelism > 0 || parallelism == -1, "The parallelism must be positive, or -1 (use default).");
- this.parallelism = parallelism;
- }
-
- /**
- * Gets the iteration's parallelism.
- *
- * @return The iterations parallelism, or -1, if not set.
- */
- public int getParallelism() {
- return parallelism;
- }
-
- /**
- * Defines whether the solution set is kept in managed memory (Flink's internal way of keeping object
- * in serialized form) or as a simple object map.
- * By default, the solution set runs in managed memory.
- *
- * @param unmanaged True, to keep the solution set in unmanaged memory, false otherwise.
- */
- public void setSolutionSetUnmanagedMemory(boolean unmanaged) {
- this.unmanagedSolutionSet = unmanaged;
- }
-
- /**
- * Gets whether the solution set is kept in managed memory (Flink's internal way of keeping object
- * in serialized form) or as a simple object map.
- * By default, the solution set runs in managed memory.
- *
- * @return True, if the solution set is in unmanaged memory, false otherwise.
- */
- public boolean isSolutionSetUnmanagedMemory() {
- return this.unmanagedSolutionSet;
- }
-
// --------------------------------------------------------------------------------------------
// Custom Operator behavior
// --------------------------------------------------------------------------------------------
@@ -249,20 +144,27 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType);
- // set up the iteration operator
- final String name = (this.name != null) ? this.name :
- "Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")";
final int[] zeroKeyPos = new int[] {0};
final DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> iteration =
this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations, zeroKeyPos);
- iteration.name(name);
- iteration.parallelism(parallelism);
- iteration.setSolutionSetUnManaged(unmanagedSolutionSet);
-
- // register all aggregators
- for (Map.Entry<String, Aggregator<?>> entry : this.aggregators.entrySet()) {
- iteration.registerAggregator(entry.getKey(), entry.getValue());
+
+ // set up the iteration operator
+ if (this.configuration != null) {
+
+ iteration.name(this.configuration.getName(
+ "Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")"));
+ iteration.parallelism(this.configuration.getParallelism());
+ iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
+
+ // register all aggregators
+ for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
+ iteration.registerAggregator(entry.getKey(), entry.getValue());
+ }
+ }
+ else {
+ // no configuration provided; set default name
+ iteration.name("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")");
}
// build the messaging function (co group)
@@ -272,8 +174,11 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
// configure coGroup message function with name and broadcast variables
messages = messages.name("Messaging");
- for (Tuple2<String, DataSet<?>> e : this.bcVarsMessaging) {
- messages = messages.withBroadcastSet(e.f1, e.f0);
+
+ if (this.configuration != null) {
+ for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
+ messages = messages.withBroadcastSet(e.f1, e.f0);
+ }
}
VertexUpdateUdf<VertexKey, VertexValue, Message> updateUdf = new VertexUpdateUdf<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
@@ -284,8 +189,11 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
// configure coGroup update function with name and broadcast variables
updates = updates.name("Vertex State Updates");
- for (Tuple2<String, DataSet<?>> e : this.bcVarsUpdate) {
- updates = updates.withBroadcastSet(e.f1, e.f0);
+
+ if (this.configuration != null) {
+ for (Tuple2<String, DataSet<?>> e : this.configuration.getUpdateBcastVars()) {
+ updates = updates.withBroadcastSet(e.f1, e.f0);
+ }
}
// let the operator know that we preserve the key field
@@ -452,4 +360,20 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
return this.resultType;
}
}
+
+ /**
+ * Configures this vertex-centric iteration with the provided parameters.
+ *
+ * @param parameters the configuration parameters
+ */
+ public void configure(IterationConfiguration parameters) {
+ this.configuration = parameters;
+ }
+
+ /**
+ * @return the configuration parameters of this vertex-centric iteration
+ */
+ public IterationConfiguration getIterationConfiguration() {
+ return this.configuration;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
index 1157a18..5a7cd5c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -92,7 +92,7 @@ public abstract class VertexUpdateFunction<VertexKey extends Comparable<VertexKe
}
/**
- * Gets the iteration aggregator registered under the given name. The iteration aggregator is combines
+ * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
* all aggregates globally once per superstep and makes them available in the next superstep.
*
* @param name The name of the aggregator.
http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
index ffe91d9..d84952a 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.graph.utils.VertexToTuple2Map;
import org.junit.Assert;
@@ -48,9 +47,8 @@ public class CollectionModeSuperstepITCase {
Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
- VertexCentricIteration<Long, Long, Long, Long> iteration =
- graph.createVertexCentricIteration(new UpdateFunction(), new MessageFunction(), 10);
- Graph<Long, Long, Long> result = graph.runVertexCentricIteration(iteration);
+ Graph<Long, Long, Long> result = graph.runVertexCentricIteration(
+ new UpdateFunction(), new MessageFunction(), 10);
result.getVertices().map(
new VertexToTuple2Map<Long, Long>()).output(
@@ -83,5 +81,4 @@ public class CollectionModeSuperstepITCase {
return 1l;
}
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
new file mode 100644
index 0000000..b497070
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import java.util.List;
+
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.IterationConfiguration;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexCentricIteration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.LongValue;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
+
+ public VertexCentricConfigurationITCase(TestExecutionMode mode){
+ super(mode);
+ }
+
+ private String resultPath;
+ private String expectedResult;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Test
+ public void testRunWithConfiguration() throws Exception {
+ /*
+ * Test Graph's runVertexCentricIteration when configuration parameters are provided
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+ TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
+
+ // create the configuration object
+ IterationConfiguration parameters = new IterationConfiguration();
+
+ parameters.addBroadcastSetForUpdateFunction("updateBcastSet", env.fromElements(1, 2, 3));
+ parameters.addBroadcastSetForMessagingFunction("messagingBcastSet", env.fromElements(4, 5, 6));
+ parameters.registerAggregator("superstepAggregator", new LongSumAggregator());
+
+ Graph<Long, Long, Long> result = graph.runVertexCentricIteration(
+ new UpdateFunction(), new MessageFunction(), 10, parameters);
+
+ result.getVertices().map(new VertexToTuple2Map<Long, Long>()).writeAsCsv(resultPath, "\n", "\t");
+ env.execute();
+ expectedResult = "1 11\n" +
+ "2 11\n" +
+ "3 11\n" +
+ "4 11\n" +
+ "5 11";
+ }
+
+ @Test
+ public void testIterationConfiguration() throws Exception {
+
+ /*
+ * Test name, parallelism and solutionSetUnmanaged parameters
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ VertexCentricIteration<Long, Long, Long, Long> iteration = VertexCentricIteration
+ .withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyUpdateFunction(),
+ new DummyMessageFunction(), 10);
+
+ IterationConfiguration parameters = new IterationConfiguration();
+ parameters.setName("gelly iteration");
+ parameters.setParallelism(2);
+ parameters.setSolutionSetUnmanagedMemory(true);
+
+ iteration.configure(parameters);
+
+ Assert.assertEquals("gelly iteration", iteration.getIterationConfiguration().getName(""));
+ Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism());
+ Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());
+
+ DataSet<Vertex<Long, Long>> result = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
+
+ result.map(new VertexToTuple2Map<Long, Long>()).writeAsCsv(resultPath, "\n", "\t");
+ env.execute();
+ expectedResult = "1 11\n" +
+ "2 12\n" +
+ "3 13\n" +
+ "4 14\n" +
+ "5 15";
+ }
+
+ @SuppressWarnings("serial")
+ public static final class UpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
+
+ LongSumAggregator aggregator = new LongSumAggregator();
+
+ @Override
+ public void preSuperstep() {
+
+ // test bcast variable
+ @SuppressWarnings("unchecked")
+ List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("updateBcastSet");
+ Assert.assertEquals(1, bcastSet.get(0));
+ Assert.assertEquals(2, bcastSet.get(1));
+ Assert.assertEquals(3, bcastSet.get(2));
+
+ // test aggregator
+ aggregator = getIterationAggregator("superstepAggregator");
+ }
+
+ @Override
+ public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
+ long superstep = getSuperstepNumber();
+ aggregator.aggregate(superstep);
+ setNewVertexValue(vertexValue + 1);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class MessageFunction extends MessagingFunction<Long, Long, Long, Long> {
+
+ @Override
+ public void preSuperstep() {
+
+ // test bcast variable
+ @SuppressWarnings("unchecked")
+ List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("messagingBcastSet");
+ Assert.assertEquals(4, bcastSet.get(0));
+ Assert.assertEquals(5, bcastSet.get(1));
+ Assert.assertEquals(6, bcastSet.get(2));
+
+ // test aggregator
+ if (getSuperstepNumber() == 2) {
+ long aggrValue = ((LongValue)getPreviousIterationAggregate("superstepAggregator")).getValue();
+ Assert.assertEquals(5, aggrValue);
+ }
+ }
+
+ @Override
+ public void sendMessages(Long vertexId, Long vertexValue) {
+ //send message to keep vertices active
+ sendMessageToAllNeighbors(vertexValue);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class DummyUpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
+
+ @Override
+ public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
+ setNewVertexValue(vertexValue + 1);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class DummyMessageFunction extends MessagingFunction<Long, Long, Long, Long> {
+
+ @Override
+ public void sendMessages(Long vertexId, Long vertexValue) {
+ //send message to keep vertices active
+ sendMessageToAllNeighbors(vertexValue);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
+
+ public Long map(Vertex<Long, Long> value) {
+ return 1l;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java
index 8fb9a11..380e027 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java
@@ -30,7 +30,6 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.JavaProgramTestBase;
@@ -65,8 +64,7 @@ public class VertexCentricConnectedComponentsITCase extends JavaProgramTestBase
DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
- VertexCentricIteration<Long, Long, Long, NullValue> iteration = graph.createVertexCentricIteration(new CCUpdater(), new CCMessager(), 100);
- Graph<Long, Long, NullValue> result = graph.runVertexCentricIteration(iteration);
+ Graph<Long, Long, NullValue> result = graph.runVertexCentricIteration(new CCUpdater(), new CCMessager(), 100);
result.getVertices().writeAsCsv(resultPath, "\n", " ");
env.execute();
[3/4] flink git commit: [FLINK-1741] [gelly] Adds Jaccard Similarity
Metric Example
Posted by va...@apache.org.
[FLINK-1741] [gelly] Adds Jaccard Similarity Metric Example
This closes #544
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e281e4d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e281e4d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e281e4d6
Branch: refs/heads/master
Commit: e281e4d6fc22e498f56d88b0f661972345bf0e55
Parents: 48713a8
Author: andralungu <lu...@gmail.com>
Authored: Mon Mar 30 14:50:09 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Fri Apr 10 11:04:19 2015 +0300
----------------------------------------------------------------------
.../JaccardSimilarityMeasureExample.java | 212 +++++++++++++++++++
.../utils/JaccardSimilarityMeasureData.java | 58 +++++
.../JaccardSimilarityMeasureExampleITCase.java | 74 +++++++
3 files changed, 344 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e281e4d6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
new file mode 100644
index 0000000..c81aeb3
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
+import org.apache.flink.types.NullValue;
+
+import java.util.HashSet;
+
+/**
+ * Given a directed, unweighted graph, return a weighted graph where the edge values are equal
+ * to the Jaccard similarity coefficient - the number of common neighbors divided by the the size
+ * of the union of neighbor sets - for the src and target vertices.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <br>
+ * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs.
+ * Edges themselves are separated by newlines.
+ * For example: <code>1 2\n1 3\n</code> defines two edges 1-2 and 1-3.
+ * </p>
+ *
+ * Usage <code> JaccardSimilarityMeasureExample <edge path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData}
+ */
+@SuppressWarnings("serial")
+public class JaccardSimilarityMeasureExample implements ProgramDescription {
+
+ public static void main(String [] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+ Graph<Long, NullValue, Double> graph = Graph.fromDataSet(edges, env);
+
+ DataSet<Vertex<Long, HashSet<Long>>> verticesWithNeighbors =
+ graph.reduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL);
+
+ Graph<Long, HashSet<Long>, Double> graphWithVertexValues = Graph.fromDataSet(verticesWithNeighbors, edges, env);
+
+ // the edge value will be the Jaccard similarity coefficient(number of common neighbors/ all neighbors)
+ DataSet<Tuple3<Long, Long, Double>> edgesWithJaccardWeight = graphWithVertexValues.getTriplets()
+ .map(new WeighEdgesMapper());
+
+ DataSet<Edge<Long, Double>> result = graphWithVertexValues.joinWithEdges(edgesWithJaccardWeight,
+ new MapFunction<Tuple2<Double, Double>, Double>() {
+
+ @Override
+ public Double map(Tuple2<Double, Double> value) throws Exception {
+ return value.f1;
+ }
+ }).getEdges();
+
+ // emit result
+ if (fileOutput) {
+ result.writeAsCsv(outputPath, "\n", ",");
+ } else {
+ result.print();
+ }
+
+ env.execute("Executing Jaccard Similarity Measure");
+ }
+
+ @Override
+ public String getDescription() {
+ return "Vertex Jaccard Similarity Measure";
+ }
+
+ /**
+ * Each vertex will have a HashSet containing its neighbor ids as value.
+ */
+ private static final class GatherNeighbors implements EdgesFunction<Long, Double, Vertex<Long, HashSet<Long>>> {
+
+ @Override
+ public Vertex<Long, HashSet<Long>> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges) throws Exception {
+
+ HashSet<Long> neighborsHashSet = new HashSet<Long>();
+ long vertexId = -1;
+
+ for(Tuple2<Long, Edge<Long, Double>> edge : edges) {
+ neighborsHashSet.add(getNeighborID(edge));
+ vertexId = edge.f0;
+ }
+ return new Vertex<Long, HashSet<Long>>(vertexId, neighborsHashSet);
+ }
+ }
+
+ /**
+ * The edge weight will be the Jaccard coefficient, which is computed as follows:
+ *
+ * Consider the edge x-y
+ * We denote by sizeX and sizeY, the neighbors hash set size of x and y respectively.
+ * sizeX+sizeY = union + intersection of neighborhoods
+ * size(hashSetX.addAll(hashSetY)).distinct = union of neighborhoods
+ * The intersection can then be deduced.
+ *
+ * The Jaccard similarity coefficient is then, the intersection/union.
+ */
+ private static class WeighEdgesMapper implements MapFunction<Triplet<Long, HashSet<Long>, Double>,
+ Tuple3<Long, Long, Double>> {
+
+ @Override
+ public Tuple3<Long, Long, Double> map(Triplet<Long, HashSet<Long>, Double> triplet)
+ throws Exception {
+
+ Vertex<Long, HashSet<Long>> source = triplet.getSrcVertex();
+ Vertex<Long, HashSet<Long>> target = triplet.getTrgVertex();
+
+ long unionPlusIntersection = source.getValue().size() + target.getValue().size();
+ // within a HashSet, all elements are distinct
+ source.getValue().addAll(target.getValue());
+ // the source value contains the union
+ long union = source.getValue().size();
+ long intersection = unionPlusIntersection - union;
+
+ return new Tuple3<Long, Long, Double>(source.getId(), target.getId(), (double) intersection/union);
+ }
+ }
+
+ /**
+ * Helper method that extracts the neighborId given an edge.
+ * @param edge
+ * @return
+ */
+ private static Long getNeighborID(Tuple2<Long, Edge<Long, Double>> edge) {
+ if(edge.f1.getSource() == edge.f0) {
+ return edge.f1.getTarget();
+ } else {
+ return edge.f1.getSource();
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String edgeInputPath = null;
+ private static String outputPath = null;
+
+ private static boolean parseParameters(String [] args) {
+ if(args.length > 0) {
+ if(args.length != 2) {
+ System.err.println("Usage JaccardSimilarityMeasureExample <edge path> <output path>");
+ return false;
+ }
+
+ fileOutput = true;
+ edgeInputPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.out.println("Executing JaccardSimilarityMeasure example with default parameters and built-in default data.");
+ System.out.println("Provide parameters to read input data from files.");
+ System.out.println("Usage JaccardSimilarityMeasureExample <edge path> <output path>");
+ }
+
+ return true;
+ }
+
+ private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+
+ if(fileOutput) {
+ return env.readCsvFile(edgeInputPath)
+ .ignoreComments("#")
+ .fieldDelimiter("\t")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class)
+ .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
+ @Override
+ public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
+ return new Edge<Long, Double>(tuple2.f0, tuple2.f1, new Double(0));
+ }
+ });
+ } else {
+ return JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e281e4d6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java
new file mode 100644
index 0000000..7564b95
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the Jaccard Similarity Measure example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class JaccardSimilarityMeasureData {
+
+ public static final String EDGES = "1 2\n" + "1 3\n" + "1 4\n" + "1 5\n" + "2 3\n" + "2 4\n" +
+ "2 5\n" + "3 4\n" + "3 5\n" + "4 5";
+
+ public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+ List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+ edges.add(new Edge<Long, Double>(1L, 2L, new Double(0)));
+ edges.add(new Edge<Long, Double>(1L, 3L, new Double(0)));
+ edges.add(new Edge<Long, Double>(1L, 4L, new Double(0)));
+ edges.add(new Edge<Long, Double>(1L, 5L, new Double(0)));
+ edges.add(new Edge<Long, Double>(2L, 3L, new Double(0)));
+ edges.add(new Edge<Long, Double>(2L, 4L, new Double(0)));
+ edges.add(new Edge<Long, Double>(2L, 5L, new Double(0)));
+ edges.add(new Edge<Long, Double>(3L, 4L, new Double(0)));
+ edges.add(new Edge<Long, Double>(3L, 5L, new Double(0)));
+ edges.add(new Edge<Long, Double>(4L, 5L, new Double(0)));
+
+ return env.fromCollection(edges);
+ }
+
+ public static final String JACCARD_EDGES = "1,2,0.6\n" + "1,3,0.6\n" + "1,4,0.6\n" + "1,5,0.6\n" +
+ "2,3,0.6\n" + "2,4,0.6\n" + "2,5,0.6\n" + "3,4,0.6\n" + "3,5,0.6\n" + "4,5,0.6";
+
+ private JaccardSimilarityMeasureData() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e281e4d6/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java
new file mode 100644
index 0000000..7269ed7
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.JaccardSimilarityMeasureExample;
+import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class JaccardSimilarityMeasureExampleITCase extends MultipleProgramsTestBase {
+
+ private String edgesPath;
+
+ private String debugResultPath;
+
+ private String resultPath;
+
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ public JaccardSimilarityMeasureExampleITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Before
+ public void before() throws Exception {
+ resultPath = tempFolder.newFile().toURI().toString();
+
+ File edgesFile = tempFolder.newFile();
+ Files.write(JaccardSimilarityMeasureData.EDGES, edgesFile, Charsets.UTF_8);
+
+ edgesPath = edgesFile.toURI().toString();
+ }
+
+ @Test
+ public void testJaccardSimilarityMeasureExample() throws Exception {
+ JaccardSimilarityMeasureExample.main(new String[]{edgesPath, resultPath});
+ expected = JaccardSimilarityMeasureData.JACCARD_EDGES;
+ }
+
+ @After
+ public void after() throws Exception {
+ compareResultsByLinesInMemory(expected, resultPath);
+ }
+}
[4/4] flink git commit: [gelly] minor javadoc correction: the Graph
is always directed.
Posted by va...@apache.org.
[gelly] minor javadoc correction: the Graph is always directed.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c518df94
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c518df94
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c518df94
Branch: refs/heads/master
Commit: c518df944fd47ad0d0fda1fd6aaa7f8fac5b4b18
Parents: e281e4d
Author: vasia <va...@apache.org>
Authored: Fri Apr 10 11:25:59 2015 +0300
Committer: vasia <va...@apache.org>
Committed: Fri Apr 10 11:26:38 2015 +0300
----------------------------------------------------------------------
.../flink-gelly/src/main/java/org/apache/flink/graph/Graph.java | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c518df94/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index c84eb12..62173e3 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -68,7 +68,7 @@ import org.apache.flink.types.NullValue;
* @see org.apache.flink.graph.Vertex
*
* @param <K> the key type for edge and vertex identifiers
- * @param <VV> the value type for vertexes
+ * @param <VV> the value type for vertices
* @param <EV> the value type for edges
*/
@SuppressWarnings("serial")
@@ -79,8 +79,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
private final DataSet<Edge<K, EV>> edges;
/**
- * Creates a graph from two DataSets: vertices and edges and allow setting
- * the undirected property
+ * Creates a graph from two DataSets: vertices and edges
*
* @param vertices a DataSet of vertices.
* @param edges a DataSet of edges.
[2/4] flink git commit: [FLINK-1694] [gelly] fixed typo and a small
code simplification
Posted by va...@apache.org.
[FLINK-1694] [gelly] fixed typo and a small code simplification
This closes #547
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48713a8a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48713a8a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48713a8a
Branch: refs/heads/master
Commit: 48713a8a8f328d3880c823a754b9ff6abc618b2a
Parents: e98bd85
Author: vasia <va...@gmail.com>
Authored: Thu Apr 9 22:04:55 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Fri Apr 10 10:54:46 2015 +0300
----------------------------------------------------------------------
docs/gelly_guide.md | 2 +-
.../src/main/java/org/apache/flink/graph/Graph.java | 8 ++------
2 files changed, 3 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/48713a8a/docs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md
index 203cffa..cc85296 100644
--- a/docs/gelly_guide.md
+++ b/docs/gelly_guide.md
@@ -373,7 +373,7 @@ and can be specified using the `setName()` method.
* <strong>Parallelism</strong>: The parallelism for the iteration. It can be set using the `setParallelism()` method.
-* <strong>Solution set in unmanaged memory</strong>: Defines whether the solution set is kept in managed memory (Flink's internal way of keeping object in serialized form) or as a simple object map. By default, the solution set runs in managed memory. This property can be set using the `setSolutionSetUnmanagedMemory()` method.
+* <strong>Solution set in unmanaged memory</strong>: Defines whether the solution set is kept in managed memory (Flink's internal way of keeping objects in serialized form) or as a simple object map. By default, the solution set runs in managed memory. This property can be set using the `setSolutionSetUnmanagedMemory()` method.
* <strong>Aggregators</strong>: Iteration aggregators can be registered using the `registerAggregator()` method. An iteration aggregator combines
all aggregates globally once per superstep and makes them available in the next superstep. Registered aggregators can be accessed inside the user-defined `VertexUpdateFunction` and `MessagingFunction`.
http://git-wip-us.apache.org/repos/asf/flink/blob/48713a8a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 8280ba9..c84eb12 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -1165,12 +1165,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
MessagingFunction<K, VV, M, EV> messagingFunction,
int maximumNumberOfIterations) {
- VertexCentricIteration<K, VV, M, EV> iteration = VertexCentricIteration.withEdges(
- edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations);
-
- DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
-
- return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
+ return this.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
+ maximumNumberOfIterations, null);
}
/**