You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/22 13:53:41 UTC

flink git commit: [FLINK-2887] [gelly] make sendMessageToAllNeighbors respect the EdgeDirection if set in the configuration

Repository: flink
Updated Branches:
  refs/heads/master 2122cf4cb -> 56146ae85


[FLINK-2887] [gelly] make sendMessageToAllNeighbors respect the EdgeDirection if set in the configuration

This closes #1281


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56146ae8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56146ae8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56146ae8

Branch: refs/heads/master
Commit: 56146ae854a2ca78750aabf06dc5ea08bc4f5414
Parents: 2122cf4
Author: vasia <va...@apache.org>
Authored: Wed Oct 21 16:32:22 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Thu Oct 22 13:52:48 2015 +0200

----------------------------------------------------------------------
 .../flink/graph/spargel/MessagingFunction.java  |  51 +++++++--
 .../graph/spargel/VertexCentricIteration.java   |   4 +-
 .../test/VertexCentricConfigurationITCase.java  | 109 +++++++++++++++++++
 3 files changed, 155 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/56146ae8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
index 4245c24..271db86 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
@@ -115,8 +115,12 @@ public abstract class MessagingFunction<K, VV, Message, EV> implements Serializa
 	/**
 	 * Gets an {@link java.lang.Iterable} with all edges. This method is mutually exclusive with
 	 * {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
+	 * <p>
+	 * If the {@link EdgeDirection} is OUT (default), then this iterator contains outgoing edges.
+	 * If the {@link EdgeDirection} is IN, then this iterator contains incoming edges.
+	 * If the {@link EdgeDirection} is ALL, then this iterator contains both outgoing and incoming edges.
 	 * 
-	 * @return An iterator with all outgoing edges.
+	 * @return An iterator with all edges.
 	 */
 	@SuppressWarnings("unchecked")
 	public Iterable<Edge<K, EV>> getEdges() {
@@ -129,24 +133,54 @@ public abstract class MessagingFunction<K, VV, Message, EV> implements Serializa
 	}
 
 	/**
-	 * Sends the given message to all vertices that are targets of an outgoing edge of the changed vertex.
+	 * Sends the given message to all vertices that are targets of an edge of the changed vertex.
 	 * This method is mutually exclusive to the method {@link #getEdges()} and may be called only once.
+	 * <p>
+	 * If the {@link EdgeDirection} is OUT (default), the message will be sent to out-neighbors.
+	 * If the {@link EdgeDirection} is IN, the message will be sent to in-neighbors.
+	 * If the {@link EdgeDirection} is ALL, the message will be sent to all neighbors.
 	 * 
 	 * @param m The message to send.
 	 */
 	public void sendMessageToAllNeighbors(Message m) {
 		if (edgesUsed) {
-			throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once.");
+			throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllNeighbors()'"
+					+ "exactly once.");
 		}
 		
 		edgesUsed = true;
-		
 		outValue.f1 = m;
 		
 		while (edges.hasNext()) {
 			Tuple next = (Tuple) edges.next();
-			K k = next.getField(1);
-			outValue.f0 = k;
+
+			/*
+			 * When EdgeDirection is OUT, the edges iterator only has the out-edges 
+			 * of the vertex, i.e. the ones where this vertex is src. 
+			 * next.getField(1) gives the neighbor of the vertex running this MessagingFunction.
+			 */
+			if (getDirection().equals(EdgeDirection.OUT)) {
+				outValue.f0 = next.getField(1);
+			}
+			/*
+			 * When EdgeDirection is IN, the edges iterator only has the in-edges 
+			 * of the vertex, i.e. the ones where this vertex is trg. 
+			 * next.getField(10) gives the neighbor of the vertex running this MessagingFunction.
+			 */
+			else if (getDirection().equals(EdgeDirection.IN)) {
+				outValue.f0 = next.getField(0);
+			}
+			 // When EdgeDirection is ALL, the edges iterator contains both in- and out- edges
+			if (getDirection().equals(EdgeDirection.ALL)) {
+				if (next.getField(0).equals(vertexId)) {
+					// send msg to the trg
+					outValue.f0 = next.getField(1);
+				}
+				else {
+					// send msg to the src
+					outValue.f0 = next.getField(0);
+				}
+			}
 			out.collect(outValue);
 		}
 	}
@@ -219,6 +253,8 @@ public abstract class MessagingFunction<K, VV, Message, EV> implements Serializa
 	private Iterator<?> edges;
 	
 	private Collector<Tuple2<K, Message>> out;
+
+	private K vertexId;
 	
 	private EdgesIterator<K, EV> edgeIterator;
 	
@@ -234,9 +270,10 @@ public abstract class MessagingFunction<K, VV, Message, EV> implements Serializa
 		this.edgeIterator = new EdgesIterator<K, EV>();
 	}
 	
-	void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out) {
+	void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out, K id) {
 		this.edges = edges;
 		this.out = out;
+		this.vertexId = id;
 		this.edgesUsed = false;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/56146ae8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
index b3a470e..fdc39ff 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -405,7 +405,7 @@ public class VertexCentricIteration<K, VV, Message, EV>
 		
 			if (stateIter.hasNext()) {
 				Vertex<K, VV> newVertexState = stateIter.next();
-				messagingFunction.set((Iterator<?>) edges.iterator(), out);
+				messagingFunction.set((Iterator<?>) edges.iterator(), out, newVertexState.getId());
 				messagingFunction.sendMessages(newVertexState);
 			}
 		}
@@ -437,7 +437,7 @@ public class VertexCentricIteration<K, VV, Message, EV>
 				messagingFunction.setInDegree(vertexWithDegrees.f1.f1);
 				messagingFunction.setOutDegree(vertexWithDegrees.f1.f2);
 
-				messagingFunction.set((Iterator<?>) edges.iterator(), out);
+				messagingFunction.set((Iterator<?>) edges.iterator(), out, vertexWithDegrees.getId());
 				messagingFunction.sendMessages(nextVertex);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/56146ae8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
index 0feb3fb..567b194 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
@@ -243,6 +243,106 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
+	public void testSendToAllDirectionIN() throws Exception {
+
+		/*
+		 * Test that sendMessageToAllNeighbors() works correctly
+		 * when the direction is set to IN
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, HashSet<Long>, Long> graph = Graph
+				.fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env)
+				.mapVertices(new InitialiseHashSetMapper());
+
+		// configure the iteration
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+		parameters.setDirection(EdgeDirection.IN);
+
+		DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
+				.runVertexCentricIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters)
+				.getVertices();
+
+        List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+
+		expectedResult = "1,[2, 3]\n" +
+				"2,[3]\n" +
+				"3,[4, 5]\n" +
+				"4,[5]\n" +
+				"5,[1]";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testSendToAllDirectionOUT() throws Exception {
+
+		/*
+		 * Test that sendMessageToAllNeighbors() works correctly
+		 * when the direction is set to OUT
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, HashSet<Long>, Long> graph = Graph
+				.fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env)
+				.mapVertices(new InitialiseHashSetMapper());
+
+		// configure the iteration
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+		parameters.setDirection(EdgeDirection.OUT);
+
+		DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
+				.runVertexCentricIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters)
+				.getVertices();
+
+        List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+
+		expectedResult = "1,[5]\n" +
+				"2,[1]\n" +
+				"3,[1, 2]\n" +
+				"4,[3]\n" +
+				"5,[3, 4]";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testSendToAllDirectionALL() throws Exception {
+
+		/*
+		 * Test that sendMessageToAllNeighbors() works correctly
+		 * when the direction is set to ALL
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, HashSet<Long>, Long> graph = Graph
+				.fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env)
+				.mapVertices(new InitialiseHashSetMapper());
+
+		// configure the iteration
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+		parameters.setDirection(EdgeDirection.ALL);
+
+		DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph
+				.runVertexCentricIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters)
+				.getVertices();
+
+        List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+
+		expectedResult = "1,[2, 3, 5]\n" +
+				"2,[1, 3]\n" +
+				"3,[1, 2, 4, 5]\n" +
+				"4,[3, 5]\n" +
+				"5,[1, 3, 4]";
+		
+		compareResultAsTuples(result, expectedResult);
+	}
+
+
+	@Test
 	public void testNumVerticesNotSet() throws Exception {
 
 		/*
@@ -645,6 +745,15 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 	}
 
 	@SuppressWarnings("serial")
+	public static final class SendMsgToAll extends MessagingFunction<Long, HashSet<Long>, Long, Long> {
+
+		@Override
+		public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception {
+			sendMessageToAllNeighbors(vertex.getId());
+		}
+	}
+
+	@SuppressWarnings("serial")
 	public static final class IdMessenger extends MessagingFunction<Long, Boolean, Long, Long> {
 
 		@Override