You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/17 14:18:47 UTC

flink git commit: [FLINK-2527] [gelly] Ensure that VertexUpdateFunction.setNewVertexValue is called at most once per updateVertex

Repository: flink
Updated Branches:
  refs/heads/release-0.9 827efd07c -> e8802f90a


[FLINK-2527] [gelly] Ensure that VertexUpdateFunction.setNewVertexValue is called at most once per updateVertex

This closes #1027


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8802f90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8802f90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8802f90

Branch: refs/heads/release-0.9
Commit: e8802f90a4d38dbd4f3fc12b973639dbf50b61bb
Parents: 827efd0
Author: Gabor Gevay <gg...@gmail.com>
Authored: Sun Aug 16 21:40:49 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 17 14:18:14 2015 +0200

----------------------------------------------------------------------
 .../flink/graph/spargel/VertexUpdateFunction.java       | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e8802f90/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
index 9930b50..248925b 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -79,7 +79,7 @@ public abstract class VertexUpdateFunction<K, VV, Message> implements Serializab
 	
 	/**
 	 * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
-	 * the incoming messages. It may set a new vertex state via {@link #setNewVV(Object)}. If the vertex
+	 * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
 	 * state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
 	 * 
 	 * @param vertex The vertex.
@@ -105,10 +105,16 @@ public abstract class VertexUpdateFunction<K, VV, Message> implements Serializab
 	
 	/**
 	 * Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
+	 *
+	 * This should be called at most once per updateVertex.
 	 * 
 	 * @param newValue The new vertex value.
 	 */
 	public void setNewVertexValue(VV newValue) {
+		if(setNewVertexValueCalled) {
+			throw new IllegalStateException("setNewVertexValue should only be called at most once per updateVertex");
+		}
+		setNewVertexValueCalled = true;
 		if(isOptDegrees()) {
 			outValWithDegrees.f1.f0 = newValue;
 			outWithDegrees.collect(outValWithDegrees);
@@ -178,6 +184,8 @@ public abstract class VertexUpdateFunction<K, VV, Message> implements Serializab
 
 	private long outDegree = -1;
 
+	private boolean setNewVertexValueCalled;
+
 	void init(IterationRuntimeContext context) {
 		this.runtimeContext = context;
 	}
@@ -185,6 +193,7 @@ public abstract class VertexUpdateFunction<K, VV, Message> implements Serializab
 	void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) {
 		this.outVal = outVal;
 		this.out = out;
+		setNewVertexValueCalled = false;
 	}
 
 	@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -192,6 +201,7 @@ public abstract class VertexUpdateFunction<K, VV, Message> implements Serializab
 			Collector out) {
 		this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) outVal;
 		this.outWithDegrees = out;
+		setNewVertexValueCalled = false;
 	}
 
 	/**