You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/22 13:53:41 UTC
flink git commit: [FLINK-2887] [gelly] make sendMessageToAllNeighbors
respect the EdgeDirection if set in the configuration
Repository: flink
Updated Branches:
refs/heads/master 2122cf4cb -> 56146ae85
[FLINK-2887] [gelly] make sendMessageToAllNeighbors respect the EdgeDirection if set in the configuration
This closes #1281
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56146ae8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56146ae8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56146ae8
Branch: refs/heads/master
Commit: 56146ae854a2ca78750aabf06dc5ea08bc4f5414
Parents: 2122cf4
Author: vasia <va...@apache.org>
Authored: Wed Oct 21 16:32:22 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Thu Oct 22 13:52:48 2015 +0200
----------------------------------------------------------------------
.../flink/graph/spargel/MessagingFunction.java | 51 +++++++--
.../graph/spargel/VertexCentricIteration.java | 4 +-
.../test/VertexCentricConfigurationITCase.java | 109 +++++++++++++++++++
3 files changed, 155 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/56146ae8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
index 4245c24..271db86 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
@@ -115,8 +115,12 @@ public abstract class MessagingFunction<K, VV, Message, EV> implements Serializa
/**
* Gets an {@link java.lang.Iterable} with all edges. This method is mutually exclusive with
* {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
+ * <p>
+ * If the {@link EdgeDirection} is OUT (default), then this iterator contains outgoing edges.
+ * If the {@link EdgeDirection} is IN, then this iterator contains incoming edges.
+ * If the {@link EdgeDirection} is ALL, then this iterator contains both outgoing and incoming edges.
*
- * @return An iterator with all outgoing edges.
+ * @return An iterator with all edges.
*/
@SuppressWarnings("unchecked")
public Iterable<Edge<K, EV>> getEdges() {
@@ -129,24 +133,54 @@ public abstract class MessagingFunction<K, VV, Message, EV> implements Serializa
}
/**
- * Sends the given message to all vertices that are targets of an outgoing edge of the changed vertex.
+ * Sends the given message to all vertices that are targets of an edge of the changed vertex.
* This method is mutually exclusive to the method {@link #getEdges()} and may be called only once.
+ * <p>
+ * If the {@link EdgeDirection} is OUT (default), the message will be sent to out-neighbors.
+ * If the {@link EdgeDirection} is IN, the message will be sent to in-neighbors.
+ * If the {@link EdgeDirection} is ALL, the message will be sent to all neighbors.
*
* @param m The message to send.
*/
public void sendMessageToAllNeighbors(Message m) {
if (edgesUsed) {
- throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once.");
+ throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()'"
+ + "exactly once.");
}
edgesUsed = true;
-
outValue.f1 = m;
while (edges.hasNext()) {
Tuple next = (Tuple) edges.next();
- K k = next.getField(1);
- outValue.f0 = k;
+
+ /*
+ * When EdgeDirection is OUT, the edges iterator only has the out-edges
+ * of the vertex, i.e. the ones where this vertex is src.
+ * next.getField(1) gives the neighbor of the vertex running this MessagingFunction.
+ */
+ if (getDirection().equals(EdgeDirection.OUT)) {
+ outValue.f0 = next.getField(1);
+ }
+ /*
+ * When EdgeDirection is IN, the edges iterator only has the in-edges
+ * of the vertex, i.e. the ones where this vertex is trg.
+ * next.getField(10) gives the neighbor of the vertex running this MessagingFunction.
+ */
+ else if (getDirection().equals(EdgeDirection.IN)) {
+ outValue.f0 = next.getField(0);
+ }
+ // When EdgeDirection is ALL, the edges iterator contains both in- and out- edges
+ if (getDirection().equals(EdgeDirection.ALL)) {
+ if (next.getField(0).equals(vertexId)) {
+ // send msg to the trg
+ outValue.f0 = next.getField(1);
+ }
+ else {
+ // send msg to the src
+ outValue.f0 = next.getField(0);
+ }
+ }
out.collect(outValue);
}
}
@@ -219,6 +253,8 @@ public abstract class MessagingFunction<K, VV, Message, EV> implements Serializa
private Iterator<?> edges;
private Collector<Tuple2<K, Message>> out;
+
+ private K vertexId;
private EdgesIterator<K, EV> edgeIterator;
@@ -234,9 +270,10 @@ public abstract class MessagingFunction<K, VV, Message, EV> implements Serializa
this.edgeIterator = new EdgesIterator<K, EV>();
}
- void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out) {
+ void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out, K id) {
this.edges = edges;
this.out = out;
+ this.vertexId = id;
this.edgesUsed = false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/56146ae8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
index b3a470e..fdc39ff 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -405,7 +405,7 @@ public class VertexCentricIteration<K, VV, Message, EV>
if (stateIter.hasNext()) {
Vertex<K, VV> newVertexState = stateIter.next();
- messagingFunction.set((Iterator<?>) edges.iterator(), out);
+ messagingFunction.set((Iterator<?>) edges.iterator(), out, newVertexState.getId());
messagingFunction.sendMessages(newVertexState);
}
}
@@ -437,7 +437,7 @@ public class VertexCentricIteration<K, VV, Message, EV>
messagingFunction.setInDegree(vertexWithDegrees.f1.f1);
messagingFunction.setOutDegree(vertexWithDegrees.f1.f2);
- messagingFunction.set((Iterator<?>) edges.iterator(), out);
+ messagingFunction.set((Iterator<?>) edges.iterator(), out, vertexWithDegrees.getId());
messagingFunction.sendMessages(nextVertex);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/56146ae8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
index 0feb3fb..567b194 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
@@ -243,6 +243,106 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
}
@Test
+ public void testSendToAllDirectionIN() throws Exception {
+
+ /*
+ * Test that sendMessageToAllNeighbors() works correctly
+ * when the direction is set to IN
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, HashSet<Long>, Long> graph = Graph
+ .fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env)
+ .mapVertices(new InitialiseHashSetMapper());
+
+ // configure the iteration
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+ parameters.setDirection(EdgeDirection.IN);
+
+ DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
+ .runVertexCentricIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters)
+ .getVertices();
+
+ List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+
+ expectedResult = "1,[2, 3]\n" +
+ "2,[3]\n" +
+ "3,[4, 5]\n" +
+ "4,[5]\n" +
+ "5,[1]";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testSendToAllDirectionOUT() throws Exception {
+
+ /*
+ * Test that sendMessageToAllNeighbors() works correctly
+ * when the direction is set to OUT
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, HashSet<Long>, Long> graph = Graph
+ .fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env)
+ .mapVertices(new InitialiseHashSetMapper());
+
+ // configure the iteration
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+ parameters.setDirection(EdgeDirection.OUT);
+
+ DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
+ .runVertexCentricIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters)
+ .getVertices();
+
+ List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+
+ expectedResult = "1,[5]\n" +
+ "2,[1]\n" +
+ "3,[1, 2]\n" +
+ "4,[3]\n" +
+ "5,[3, 4]";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testSendToAllDirectionALL() throws Exception {
+
+ /*
+ * Test that sendMessageToAllNeighbors() works correctly
+ * when the direction is set to ALL
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, HashSet<Long>, Long> graph = Graph
+ .fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env)
+ .mapVertices(new InitialiseHashSetMapper());
+
+ // configure the iteration
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+ parameters.setDirection(EdgeDirection.ALL);
+
+ DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
+ .runVertexCentricIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters)
+ .getVertices();
+
+ List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+
+ expectedResult = "1,[2, 3, 5]\n" +
+ "2,[1, 3]\n" +
+ "3,[1, 2, 4, 5]\n" +
+ "4,[3, 5]\n" +
+ "5,[1, 3, 4]";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+
+ @Test
public void testNumVerticesNotSet() throws Exception {
/*
@@ -645,6 +745,15 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
+ public static final class SendMsgToAll extends MessagingFunction<Long, HashSet<Long>, Long, Long> {
+
+ @Override
+ public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception {
+ sendMessageToAllNeighbors(vertex.getId());
+ }
+ }
+
+ @SuppressWarnings("serial")
public static final class IdMessenger extends MessagingFunction<Long, Boolean, Long, Long> {
@Override