You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/17 14:18:47 UTC
flink git commit: [FLINK-2527] [gelly] Ensure that
VertexUpdateFunction.setNewVertexValue is called at most once per
updateVertex
Repository: flink
Updated Branches:
refs/heads/release-0.9 827efd07c -> e8802f90a
[FLINK-2527] [gelly] Ensure that VertexUpdateFunction.setNewVertexValue is called at most once per updateVertex
This closes #1027
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8802f90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8802f90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8802f90
Branch: refs/heads/release-0.9
Commit: e8802f90a4d38dbd4f3fc12b973639dbf50b61bb
Parents: 827efd0
Author: Gabor Gevay <gg...@gmail.com>
Authored: Sun Aug 16 21:40:49 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 17 14:18:14 2015 +0200
----------------------------------------------------------------------
.../flink/graph/spargel/VertexUpdateFunction.java | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e8802f90/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
index 9930b50..248925b 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -79,7 +79,7 @@ public abstract class VertexUpdateFunction<K, VV, Message> implements Serializab
/**
* This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
- * the incoming messages. It may set a new vertex state via {@link #setNewVV(Object)}. If the vertex
+ * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
* state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
*
* @param vertex The vertex.
@@ -105,10 +105,16 @@ public abstract class VertexUpdateFunction<K, VV, Message> implements Serializab
/**
* Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
+ *
+ * This should be called at most once per updateVertex.
*
* @param newValue The new vertex value.
*/
public void setNewVertexValue(VV newValue) {
+ if(setNewVertexValueCalled) {
+ throw new IllegalStateException("setNewVertexValue should only be called at most once per updateVertex");
+ }
+ setNewVertexValueCalled = true;
if(isOptDegrees()) {
outValWithDegrees.f1.f0 = newValue;
outWithDegrees.collect(outValWithDegrees);
@@ -178,6 +184,8 @@ public abstract class VertexUpdateFunction<K, VV, Message> implements Serializab
private long outDegree = -1;
+ private boolean setNewVertexValueCalled;
+
void init(IterationRuntimeContext context) {
this.runtimeContext = context;
}
@@ -185,6 +193,7 @@ public abstract class VertexUpdateFunction<K, VV, Message> implements Serializab
void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) {
this.outVal = outVal;
this.out = out;
+ setNewVertexValueCalled = false;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -192,6 +201,7 @@ public abstract class VertexUpdateFunction<K, VV, Message> implements Serializab
Collector out) {
this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) outVal;
this.outWithDegrees = out;
+ setNewVertexValueCalled = false;
}
/**