You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by an...@apache.org on 2015/06/21 12:30:41 UTC

flink git commit: [FLINK-2140][gelly] Allowed access to the number of vertices in the GSA functions

Repository: flink
Updated Branches:
  refs/heads/master 05bff2295 -> 57459e95e


[FLINK-2140][gelly] Allowed access to the number of vertices in the GSA functions

Similar to the vertex-centric approach, this PR enables users to access the total number of vertices from within the gather, sum and/or apply functions of a GSA iteration.

Author: andralungu <lu...@gmail.com>

Closes #779 from andralungu/numVerticesGSA and squashes the following commits:

cffd944 [andralungu] [FLINK-2140][gelly] Allowed access to the number of vertices in the GSA functions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57459e95
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57459e95
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57459e95

Branch: refs/heads/master
Commit: 57459e95e57630621fcdd6247857edd9824b900d
Parents: 05bff22
Author: andralungu <lu...@gmail.com>
Authored: Sun Jun 21 12:27:27 2015 +0200
Committer: andra <an...@apache.org>
Committed: Sun Jun 21 12:27:27 2015 +0200

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        | 45 +++++++++++++++++++-
 .../flink/graph/IterationConfiguration.java     | 23 ++++++++++
 .../apache/flink/graph/gsa/ApplyFunction.java   | 21 +++++++++
 .../apache/flink/graph/gsa/GatherFunction.java  | 21 +++++++++
 .../graph/gsa/GatherSumApplyIteration.java      | 19 +++++++++
 .../org/apache/flink/graph/gsa/SumFunction.java | 21 +++++++++
 .../spargel/VertexCentricConfiguration.java     | 23 ----------
 .../test/GatherSumApplyConfigurationITCase.java | 17 ++++++++
 8 files changed, 166 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57459e95/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index c788012..eb10405 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -461,7 +461,6 @@ The number of vertices can then be accessed in the vertex update function and in
 The in/out degrees can then be accessed in the vertex update function and in the messaging function, per vertex using the `getInDegree()` and `getOutDegree()` methods.
 If the degrees option is not set in the configuration, these methods will return -1.
 
-
 * <strong>Messaging Direction</strong>: By default, a vertex sends messages to its out-neighbors and updates its value based on messages received from its in-neighbors. This configuration option allows users to change the messaging direction to either `EdgeDirection.IN`, `EdgeDirection.OUT`, `EdgeDirection.ALL`. The messaging direction also dictates the update direction which would be `EdgeDirection.OUT`, `EdgeDirection.IN` and `EdgeDirection.ALL`, respectively. This property can be set using the `setDirection()` method.
 
 {% highlight java %}
@@ -687,6 +686,50 @@ Currently, the following parameters can be specified:
 
 * <strong>Broadcast Variables</strong>: DataSets can be added as [Broadcast Variables]({{site.baseurl}}/apis/programming_guide.html#broadcast-variables) to the `GatherFunction`, `SumFunction` and `ApplyFunction`, using the methods `addBroadcastSetForGatherFunction()`, `addBroadcastSetForSumFunction()` and `addBroadcastSetForApplyFunction` methods, respectively.
 
+* <strong>Number of Vertices</strong>: Accessing the total number of vertices within the iteration. This property can be set using the `setOptNumVertices()` method.
+The number of vertices can then be accessed in the gather, sum and/or apply functions by using the `getNumberOfVertices()` method. If the option is not set in the configuration, this method will return -1.
+
+The following example illustrates the usage of the number of vertices option.
+
+{% highlight java %}
+
+Graph<Long, Double, Double> graph = ...
+
+// configure the iteration
+GSAConfiguration parameters = new GSAConfiguration();
+
+// set the number of vertices option to true
+parameters.setOptNumVertices(true);
+
+// run the gather-sum-apply iteration, also passing the configuration parameters
+Graph<Long, Long, Long> result = graph.runGatherSumApplyIteration(
+				new Gather(), new Sum(), new Apply(),
+			    maxIterations, parameters);
+
+// user-defined functions
+public static final class Gather {
+	...
+	// get the number of vertices
+	long numVertices = getNumberOfVertices();
+	...
+}
+
+public static final class Sum {
+	...
+    // get the number of vertices
+    long numVertices = getNumberOfVertices();
+    ...
+}
+
+public static final class Apply {
+	...
+    // get the number of vertices
+    long numVertices = getNumberOfVertices();
+    ...
+}
+
+{% endhighlight %}
+
 [Back to top](#top)
 
 ### Vertex-centric and GSA Comparison

http://git-wip-us.apache.org/repos/asf/flink/blob/57459e95/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
index 3086172..3215194 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
@@ -40,6 +40,9 @@ public abstract class IterationConfiguration {
 
 	/** flag that defines whether the solution set is kept in managed memory **/
 	private boolean unmanagedSolutionSet = false;
+
+	/** flag that defines whether the number of vertices option is set **/
+	private boolean optNumVertices = false;
 	
 	public IterationConfiguration() {}
 
@@ -109,6 +112,26 @@ public abstract class IterationConfiguration {
 	}
 
 	/**
+	 * Gets whether the number of vertices option is set.
+	 * By default, the number of vertices option is not set.
+	 *
+	 * @return True, if the number of vertices option is set, false otherwise.
+	 */
+	public boolean isOptNumVertices() {
+		return optNumVertices;
+	}
+
+	/**
+	 * Sets the number of vertices option.
+	 * By default, the number of vertices option is not set.
+	 *
+	 * @param optNumVertices True, to set this option, false otherwise.
+	 */
+	public void setOptNumVertices(boolean optNumVertices) {
+		this.optNumVertices = optNumVertices;
+	}
+
+	/**
 	 * Registers a new aggregator. Aggregators registered here are available during the execution of the vertex updates
 	 * via {@link org.apache.flink.graph.spargel.VertexUpdateFunction#getIterationAggregator(String)} and
 	 * {@link org.apache.flink.graph.spargel.VertexUpdateFunction#getPreviousIterationAggregate(String)}.

http://git-wip-us.apache.org/repos/asf/flink/blob/57459e95/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
index d88fe0d..ed0cf70 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -30,6 +30,27 @@ import java.util.Collection;
 @SuppressWarnings("serial")
 public abstract class ApplyFunction<K, VV, M> implements Serializable {
 
+	// --------------------------------------------------------------------------------------------
+	//  Attribute that allows access to the total number of vertices inside an iteration.
+	// --------------------------------------------------------------------------------------------
+
+	private long numberOfVertices = -1L;
+
+	/**
+	 * Retrieves the number of vertices in the graph.
+	 * @return the number of vertices if the {@link IterationConfigurationion#setOptNumVertices(boolean)}
+	 * option has been set; -1 otherwise.
+	 */
+	public long getNumberOfVertices() {
+		return numberOfVertices;
+	}
+
+	void setNumberOfVertices(long numberOfVertices) {
+		this.numberOfVertices = numberOfVertices;
+	}
+
+	//---------------------------------------------------------------------------------------------
+
 	public abstract void apply(M newValue, VV currentValue);
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/57459e95/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
index 37ff2d6..5a09a5a 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
@@ -28,6 +28,27 @@ import java.util.Collection;
 @SuppressWarnings("serial")
 public abstract class GatherFunction<VV, EV, M> implements Serializable {
 
+	// --------------------------------------------------------------------------------------------
+	//  Attribute that allows access to the total number of vertices inside an iteration.
+	// --------------------------------------------------------------------------------------------
+
+	private long numberOfVertices = -1L;
+
+	/**
+	 * Retrieves the number of vertices in the graph.
+	 * @return the number of vertices if the {@link IterationConfigurationion#setOptNumVertices(boolean)}
+	 * option has been set; -1 otherwise.
+	 */
+	public long getNumberOfVertices() {
+		return numberOfVertices;
+	}
+
+	void setNumberOfVertices(long numberOfVertices) {
+		this.numberOfVertices = numberOfVertices;
+	}
+
+	//---------------------------------------------------------------------------------------------
+
 	public abstract M gather(Neighbor<VV, EV> neighbor);
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/57459e95/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index a80369d..389cf02 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
@@ -38,6 +39,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.util.Collector;
 import java.util.Map;
@@ -116,6 +118,23 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 		TypeInformation<Tuple2<K, M>> innerType = new TupleTypeInfo<Tuple2<K, M>>(keyType, messageType);
 		TypeInformation<Vertex<K, VV>> outputType = vertexDataSet.getType();
 
+		// create a graph
+		Graph<K, VV, EV> graph =
+				Graph.fromDataSet(vertexDataSet, edgeDataSet, ExecutionEnvironment.getExecutionEnvironment());
+
+		// check whether the numVertices option is set and, if so, compute the total number of vertices
+		// and set it within the gather, sum and apply functions
+		if (this.configuration != null && this.configuration.isOptNumVertices()) {
+			try {
+				long numberOfVertices = graph.numberOfVertices();
+				gather.setNumberOfVertices(numberOfVertices);
+				sum.setNumberOfVertices(numberOfVertices);
+				apply.setNumberOfVertices(numberOfVertices);
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+
 		// Prepare UDFs
 		GatherUdf<K, VV, EV, M> gatherUdf = new GatherUdf<K, VV, EV, M>(gather, innerType);
 		SumUdf<K, VV, EV, M> sumUdf = new SumUdf<K, VV, EV, M>(sum, innerType);

http://git-wip-us.apache.org/repos/asf/flink/blob/57459e95/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
index 16cd682..69baae4 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
@@ -28,6 +28,27 @@ import java.util.Collection;
 @SuppressWarnings("serial")
 public abstract class SumFunction<VV, EV, M> implements Serializable {
 
+	// --------------------------------------------------------------------------------------------
+	//  Attribute that allows access to the total number of vertices inside an iteration.
+	// --------------------------------------------------------------------------------------------
+
+	private long numberOfVertices = -1L;
+
+	/**
+	 * Retrieves the number of vertices in the graph.
+	 * @return the number of vertices if the {@link IterationConfigurationion#setOptNumVertices(boolean)}
+	 * option has been set; -1 otherwise.
+	 */
+	public long getNumberOfVertices() {
+		return numberOfVertices;
+	}
+
+	void setNumberOfVertices(long numberOfVertices) {
+		this.numberOfVertices = numberOfVertices;
+	}
+
+	//---------------------------------------------------------------------------------------------
+
 	public abstract M sum(M arg0, M arg1);
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/57459e95/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
index e76c174..afd4ffd 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
@@ -47,9 +47,6 @@ public class VertexCentricConfiguration extends IterationConfiguration {
 	/** flag that defines whether the degrees option is set **/
 	private boolean optDegrees = false;
 
-	/** flag that defines whether the number of vertices option is set **/
-	private boolean optNumVertices = false;
-
 	/** the direction in which the messages should be sent **/
 	private EdgeDirection direction = EdgeDirection.OUT;
 
@@ -116,26 +113,6 @@ public class VertexCentricConfiguration extends IterationConfiguration {
 	}
 
 	/**
-	 * Gets whether the number of vertices option is set.
-	 * By default, the number of vertices option is not set.
-	 *
-	 * @return True, if the number of vertices option is set, false otherwise.
-	 */
-	public boolean isOptNumVertices() {
-		return optNumVertices;
-	}
-
-	/**
-	 * Sets the number of vertices option.
-	 * By default, the number of vertices option is not set.
-	 *
-	 * @param optNumVertices True, to set this option, false otherwise.
-	 */
-	public void setOptNumVertices(boolean optNumVertices) {
-		this.optNumVertices = optNumVertices;
-	}
-
-	/**
 	 * Gets the direction in which messages are sent in the MessagingFunction.
 	 * By default the messaging direction is OUT.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/57459e95/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
index ca5d5d9..5befafe 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
@@ -84,6 +84,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 		parameters.addBroadcastSetForSumFunction("sumBcastSet", env.fromElements(4, 5, 6));
 		parameters.addBroadcastSetForApplyFunction("applyBcastSet", env.fromElements(7, 8, 9));
 		parameters.registerAggregator("superstepAggregator", new LongSumAggregator());
+		parameters.setOptNumVertices(true);
 
 		Graph<Long, Long, Long> result = graph.runGatherSumApplyIteration(new Gather(), new Sum(),
 				new Apply(), 10, parameters);
@@ -151,6 +152,9 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 
 				Assert.assertEquals(7, aggrValue);
 			}
+
+			// test number of vertices
+			Assert.assertEquals(5, getNumberOfVertices());
 		}
 
 		public Long gather(Neighbor<Long, Long> neighbor) {
@@ -175,6 +179,9 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 
 			// test aggregator
 			aggregator = getIterationAggregator("superstepAggregator");
+
+			// test number of vertices
+			Assert.assertEquals(5, getNumberOfVertices());
 		}
 
 		public Long sum(Long newValue, Long currentValue) {
@@ -201,6 +208,9 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 
 			// test aggregator
 			aggregator = getIterationAggregator("superstepAggregator");
+
+			// test number of vertices
+			Assert.assertEquals(5, getNumberOfVertices());
 		}
 
 		public void apply(Long summedValue, Long origValue) {
@@ -213,6 +223,13 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 	@SuppressWarnings("serial")
 	private static final class DummyGather extends GatherFunction<Long, Long, Long> {
 
+		@Override
+		public void preSuperstep() {
+			// test number of vertices
+			// when the numVertices option is not set, -1 is returned
+			Assert.assertEquals(-1, getNumberOfVertices());
+		}
+
 		public Long gather(Neighbor<Long, Long> neighbor) {
 			return neighbor.getNeighborValue();
 		}