You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by an...@apache.org on 2015/06/21 12:30:41 UTC
flink git commit: [FLINK-2140][gelly] Allowed access to the number of
vertices in the GSA functions
Repository: flink
Updated Branches:
refs/heads/master 05bff2295 -> 57459e95e
[FLINK-2140][gelly] Allowed access to the number of vertices in the GSA functions
Similar to the vertex-centric approach, this PR enables users to access the total number of vertices from within the gather, sum and/or apply functions of a GSA iteration.
Author: andralungu <lu...@gmail.com>
Closes #779 from andralungu/numVerticesGSA and squashes the following commits:
cffd944 [andralungu] [FLINK-2140][gelly] Allowed access to the number of vertices in the GSA functions
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57459e95
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57459e95
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57459e95
Branch: refs/heads/master
Commit: 57459e95e57630621fcdd6247857edd9824b900d
Parents: 05bff22
Author: andralungu <lu...@gmail.com>
Authored: Sun Jun 21 12:27:27 2015 +0200
Committer: andra <an...@apache.org>
Committed: Sun Jun 21 12:27:27 2015 +0200
----------------------------------------------------------------------
docs/libs/gelly_guide.md | 45 +++++++++++++++++++-
.../flink/graph/IterationConfiguration.java | 23 ++++++++++
.../apache/flink/graph/gsa/ApplyFunction.java | 21 +++++++++
.../apache/flink/graph/gsa/GatherFunction.java | 21 +++++++++
.../graph/gsa/GatherSumApplyIteration.java | 19 +++++++++
.../org/apache/flink/graph/gsa/SumFunction.java | 21 +++++++++
.../spargel/VertexCentricConfiguration.java | 23 ----------
.../test/GatherSumApplyConfigurationITCase.java | 17 ++++++++
8 files changed, 166 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/57459e95/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index c788012..eb10405 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -461,7 +461,6 @@ The number of vertices can then be accessed in the vertex update function and in
The in/out degrees can then be accessed in the vertex update function and in the messaging function, per vertex using the `getInDegree()` and `getOutDegree()` methods.
If the degrees option is not set in the configuration, these methods will return -1.
-
* <strong>Messaging Direction</strong>: By default, a vertex sends messages to its out-neighbors and updates its value based on messages received from its in-neighbors. This configuration option allows users to change the messaging direction to either `EdgeDirection.IN`, `EdgeDirection.OUT`, `EdgeDirection.ALL`. The messaging direction also dictates the update direction which would be `EdgeDirection.OUT`, `EdgeDirection.IN` and `EdgeDirection.ALL`, respectively. This property can be set using the `setDirection()` method.
{% highlight java %}
@@ -687,6 +686,50 @@ Currently, the following parameters can be specified:
* <strong>Broadcast Variables</strong>: DataSets can be added as [Broadcast Variables]({{site.baseurl}}/apis/programming_guide.html#broadcast-variables) to the `GatherFunction`, `SumFunction` and `ApplyFunction`, using the methods `addBroadcastSetForGatherFunction()`, `addBroadcastSetForSumFunction()` and `addBroadcastSetForApplyFunction` methods, respectively.
+* <strong>Number of Vertices</strong>: Accessing the total number of vertices within the iteration. This property can be set using the `setOptNumVertices()` method.
+The number of vertices can then be accessed in the gather, sum and/or apply functions by using the `getNumberOfVertices()` method. If the option is not set in the configuration, this method will return -1.
+
+The following example illustrates the usage of the number of vertices option.
+
+{% highlight java %}
+
+Graph<Long, Double, Double> graph = ...
+
+// configure the iteration
+GSAConfiguration parameters = new GSAConfiguration();
+
+// set the number of vertices option to true
+parameters.setOptNumVertices(true);
+
+// run the gather-sum-apply iteration, also passing the configuration parameters
+Graph<Long, Long, Long> result = graph.runGatherSumApplyIteration(
+ new Gather(), new Sum(), new Apply(),
+ maxIterations, parameters);
+
+// user-defined functions
+public static final class Gather {
+ ...
+ // get the number of vertices
+ long numVertices = getNumberOfVertices();
+ ...
+}
+
+public static final class Sum {
+ ...
+ // get the number of vertices
+ long numVertices = getNumberOfVertices();
+ ...
+}
+
+public static final class Apply {
+ ...
+ // get the number of vertices
+ long numVertices = getNumberOfVertices();
+ ...
+}
+
+{% endhighlight %}
+
[Back to top](#top)
### Vertex-centric and GSA Comparison
http://git-wip-us.apache.org/repos/asf/flink/blob/57459e95/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
index 3086172..3215194 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
@@ -40,6 +40,9 @@ public abstract class IterationConfiguration {
/** flag that defines whether the solution set is kept in managed memory **/
private boolean unmanagedSolutionSet = false;
+
+ /** flag that defines whether the number of vertices option is set **/
+ private boolean optNumVertices = false;
public IterationConfiguration() {}
@@ -109,6 +112,26 @@ public abstract class IterationConfiguration {
}
/**
+ * Gets whether the number of vertices option is set.
+ * By default, the number of vertices option is not set.
+ *
+ * @return True, if the number of vertices option is set, false otherwise.
+ */
+ public boolean isOptNumVertices() {
+ return optNumVertices;
+ }
+
+ /**
+ * Sets the number of vertices option.
+ * By default, the number of vertices option is not set.
+ *
+ * @param optNumVertices True, to set this option, false otherwise.
+ */
+ public void setOptNumVertices(boolean optNumVertices) {
+ this.optNumVertices = optNumVertices;
+ }
+
+ /**
* Registers a new aggregator. Aggregators registered here are available during the execution of the vertex updates
* via {@link org.apache.flink.graph.spargel.VertexUpdateFunction#getIterationAggregator(String)} and
* {@link org.apache.flink.graph.spargel.VertexUpdateFunction#getPreviousIterationAggregate(String)}.
http://git-wip-us.apache.org/repos/asf/flink/blob/57459e95/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
index d88fe0d..ed0cf70 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -30,6 +30,27 @@ import java.util.Collection;
@SuppressWarnings("serial")
public abstract class ApplyFunction<K, VV, M> implements Serializable {
+ // --------------------------------------------------------------------------------------------
+ // Attribute that allows access to the total number of vertices inside an iteration.
+ // --------------------------------------------------------------------------------------------
+
+ private long numberOfVertices = -1L;
+
+ /**
+ * Retrieves the number of vertices in the graph.
+ * @return the number of vertices if the {@link IterationConfigurationion#setOptNumVertices(boolean)}
+ * option has been set; -1 otherwise.
+ */
+ public long getNumberOfVertices() {
+ return numberOfVertices;
+ }
+
+ void setNumberOfVertices(long numberOfVertices) {
+ this.numberOfVertices = numberOfVertices;
+ }
+
+ //---------------------------------------------------------------------------------------------
+
public abstract void apply(M newValue, VV currentValue);
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/57459e95/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
index 37ff2d6..5a09a5a 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
@@ -28,6 +28,27 @@ import java.util.Collection;
@SuppressWarnings("serial")
public abstract class GatherFunction<VV, EV, M> implements Serializable {
+ // --------------------------------------------------------------------------------------------
+ // Attribute that allows access to the total number of vertices inside an iteration.
+ // --------------------------------------------------------------------------------------------
+
+ private long numberOfVertices = -1L;
+
+ /**
+ * Retrieves the number of vertices in the graph.
+ * @return the number of vertices if the {@link IterationConfigurationion#setOptNumVertices(boolean)}
+ * option has been set; -1 otherwise.
+ */
+ public long getNumberOfVertices() {
+ return numberOfVertices;
+ }
+
+ void setNumberOfVertices(long numberOfVertices) {
+ this.numberOfVertices = numberOfVertices;
+ }
+
+ //---------------------------------------------------------------------------------------------
+
public abstract M gather(Neighbor<VV, EV> neighbor);
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/57459e95/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index a80369d..389cf02 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
import org.apache.flink.api.java.operators.CustomUnaryOperation;
@@ -38,6 +39,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.util.Collector;
import java.util.Map;
@@ -116,6 +118,23 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
TypeInformation<Tuple2<K, M>> innerType = new TupleTypeInfo<Tuple2<K, M>>(keyType, messageType);
TypeInformation<Vertex<K, VV>> outputType = vertexDataSet.getType();
+ // create a graph
+ Graph<K, VV, EV> graph =
+ Graph.fromDataSet(vertexDataSet, edgeDataSet, ExecutionEnvironment.getExecutionEnvironment());
+
+ // check whether the numVertices option is set and, if so, compute the total number of vertices
+ // and set it within the gather, sum and apply functions
+ if (this.configuration != null && this.configuration.isOptNumVertices()) {
+ try {
+ long numberOfVertices = graph.numberOfVertices();
+ gather.setNumberOfVertices(numberOfVertices);
+ sum.setNumberOfVertices(numberOfVertices);
+ apply.setNumberOfVertices(numberOfVertices);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
// Prepare UDFs
GatherUdf<K, VV, EV, M> gatherUdf = new GatherUdf<K, VV, EV, M>(gather, innerType);
SumUdf<K, VV, EV, M> sumUdf = new SumUdf<K, VV, EV, M>(sum, innerType);
http://git-wip-us.apache.org/repos/asf/flink/blob/57459e95/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
index 16cd682..69baae4 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
@@ -28,6 +28,27 @@ import java.util.Collection;
@SuppressWarnings("serial")
public abstract class SumFunction<VV, EV, M> implements Serializable {
+ // --------------------------------------------------------------------------------------------
+ // Attribute that allows access to the total number of vertices inside an iteration.
+ // --------------------------------------------------------------------------------------------
+
+ private long numberOfVertices = -1L;
+
+ /**
+ * Retrieves the number of vertices in the graph.
+ * @return the number of vertices if the {@link IterationConfigurationion#setOptNumVertices(boolean)}
+ * option has been set; -1 otherwise.
+ */
+ public long getNumberOfVertices() {
+ return numberOfVertices;
+ }
+
+ void setNumberOfVertices(long numberOfVertices) {
+ this.numberOfVertices = numberOfVertices;
+ }
+
+ //---------------------------------------------------------------------------------------------
+
public abstract M sum(M arg0, M arg1);
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/57459e95/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
index e76c174..afd4ffd 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
@@ -47,9 +47,6 @@ public class VertexCentricConfiguration extends IterationConfiguration {
/** flag that defines whether the degrees option is set **/
private boolean optDegrees = false;
- /** flag that defines whether the number of vertices option is set **/
- private boolean optNumVertices = false;
-
/** the direction in which the messages should be sent **/
private EdgeDirection direction = EdgeDirection.OUT;
@@ -116,26 +113,6 @@ public class VertexCentricConfiguration extends IterationConfiguration {
}
/**
- * Gets whether the number of vertices option is set.
- * By default, the number of vertices option is not set.
- *
- * @return True, if the number of vertices option is set, false otherwise.
- */
- public boolean isOptNumVertices() {
- return optNumVertices;
- }
-
- /**
- * Sets the number of vertices option.
- * By default, the number of vertices option is not set.
- *
- * @param optNumVertices True, to set this option, false otherwise.
- */
- public void setOptNumVertices(boolean optNumVertices) {
- this.optNumVertices = optNumVertices;
- }
-
- /**
* Gets the direction in which messages are sent in the MessagingFunction.
* By default the messaging direction is OUT.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/57459e95/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
index ca5d5d9..5befafe 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
@@ -84,6 +84,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
parameters.addBroadcastSetForSumFunction("sumBcastSet", env.fromElements(4, 5, 6));
parameters.addBroadcastSetForApplyFunction("applyBcastSet", env.fromElements(7, 8, 9));
parameters.registerAggregator("superstepAggregator", new LongSumAggregator());
+ parameters.setOptNumVertices(true);
Graph<Long, Long, Long> result = graph.runGatherSumApplyIteration(new Gather(), new Sum(),
new Apply(), 10, parameters);
@@ -151,6 +152,9 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
Assert.assertEquals(7, aggrValue);
}
+
+ // test number of vertices
+ Assert.assertEquals(5, getNumberOfVertices());
}
public Long gather(Neighbor<Long, Long> neighbor) {
@@ -175,6 +179,9 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
// test aggregator
aggregator = getIterationAggregator("superstepAggregator");
+
+ // test number of vertices
+ Assert.assertEquals(5, getNumberOfVertices());
}
public Long sum(Long newValue, Long currentValue) {
@@ -201,6 +208,9 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
// test aggregator
aggregator = getIterationAggregator("superstepAggregator");
+
+ // test number of vertices
+ Assert.assertEquals(5, getNumberOfVertices());
}
public void apply(Long summedValue, Long origValue) {
@@ -213,6 +223,13 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
@SuppressWarnings("serial")
private static final class DummyGather extends GatherFunction<Long, Long, Long> {
+ @Override
+ public void preSuperstep() {
+ // test number of vertices
+ // when the numVertices option is not set, -1 is returned
+ Assert.assertEquals(-1, getNumberOfVertices());
+ }
+
public Long gather(Neighbor<Long, Long> neighbor) {
return neighbor.getNeighborValue();
}