You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by an...@apache.org on 2015/07/12 11:24:45 UTC

flink git commit: [FLINK-2141][gelly] Allow GSA's Gather to perform this operation in IN, OUT or ALL directions

Repository: flink
Updated Branches:
  refs/heads/master 3b69b2499 -> 9350264bd


[FLINK-2141][gelly] Allow GSA's Gather to perform this operation in IN, OUT or ALL directions

[FLINK-2141][gelly] Added Test cases for GSAConfiguration setDirection. Made changes in coding style

[FLINK-2141][gelly]Removed Example

[FLINK-2141][gelly] Corrected Annotation and Gelly guide

Minor Changes in Gelly Guide

This closes #877


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9350264b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9350264b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9350264b

Branch: refs/heads/master
Commit: 9350264bdda2b4a5e230c4621a612e38a0f77be1
Parents: 3b69b24
Author: Shivani <sh...@gmail.com>
Authored: Wed Jul 1 03:03:58 2015 +0200
Committer: andra <an...@apache.org>
Committed: Sun Jul 12 11:18:27 2015 +0200

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        |  20 +++
 .../flink/graph/gsa/GSAConfiguration.java       |  23 +++
 .../graph/gsa/GatherSumApplyIteration.java      |  47 +++++-
 .../test/GatherSumApplyConfigurationITCase.java | 157 +++++++++++++++++++
 4 files changed, 244 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9350264b/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 1e9bf48..e0e05b3 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -693,6 +693,9 @@ Currently, the following parameters can be specified:
 * <strong>Number of Vertices</strong>: Accessing the total number of vertices within the iteration. This property can be set using the `setOptNumVertices()` method.
 The number of vertices can then be accessed in the gather, sum and/or apply functions by using the `getNumberOfVertices()` method. If the option is not set in the configuration, this method will return -1.
 
+* <strong>Neighbor Direction</strong>: By default values are gathered from the out neighbors of the Vertex. This can be modified
+using the `setDirection()` method.
+
 The following example illustrates the usage of the number of vertices option.
 
 {% highlight java %}
@@ -734,6 +737,23 @@ public static final class Apply {
 
 {% endhighlight %}
 
+The following example illustrates the usage of the edge direction option.
+{% highlight java %}
+
+Graph<Long, HashSet<Long>, Double> graph = ...
+
+// configure the iteration
+GSAConfiguration parameters = new GSAConfiguration();
+
+// set the messaging direction
+parameters.setDirection(EdgeDirection.IN);
+
+// run the gather-sum-apply iteration, also passing the configuration parameters
+DataSet<Vertex<Long, HashSet<Long>>> result =
+			graph.runGatherSumApplyIteration(
+			new Gather(), new Sum(), new Apply(), maxIterations, parameters)
+			.getVertices();
+{% endhighlight %}
 [Back to top](#top)
 
 ### Vertex-centric and GSA Comparison

http://git-wip-us.apache.org/repos/asf/flink/blob/9350264b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
index de47280..8d24f16 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.gsa;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.IterationConfiguration;
 
 import java.util.ArrayList;
@@ -46,6 +47,8 @@ public class GSAConfiguration extends IterationConfiguration {
 	/** the broadcast variables for the apply function **/
 	private List<Tuple2<String, DataSet<?>>> bcVarsApply = new ArrayList<Tuple2<String,DataSet<?>>>();
 
+	private EdgeDirection direction = EdgeDirection.OUT;
+
 	public GSAConfiguration() {}
 
 	/**
@@ -107,4 +110,24 @@ public class GSAConfiguration extends IterationConfiguration {
 	public List<Tuple2<String, DataSet<?>>> getApplyBcastVars() {
 		return this.bcVarsApply;
 	}
+
+	/**
+	 * Gets the direction from which the neighbors are to be selected
+	 * By default the neighbors who are target of the edges are selected
+	 *
+	 * @return an EdgeDirection, which can be either IN, OUT or ALL.
+	 */
+	public EdgeDirection getDirection() {
+		return direction;
+	}
+
+	/**
+	 * Sets the direction in which neighbors are to be selected
+	 * By default the neighbors who are target of the edges are selected
+	 *
+	 * @param direction - IN, OUT or ALL
+	 */
+	public void setDirection(EdgeDirection direction) {
+		this.direction = direction;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9350264b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 389cf02..4c91089 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -39,6 +39,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.util.Collector;
@@ -64,6 +65,7 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 	private final SumFunction<VV, EV, M> sum;
 	private final ApplyFunction<K, VV, M> apply;
 	private final int maximumNumberOfIterations;
+	private EdgeDirection direction = EdgeDirection.OUT;
 
 	private GSAConfiguration configuration;
 
@@ -163,9 +165,34 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 		}
 
 		// Prepare the neighbors
-		DataSet<Tuple2<K, Neighbor<VV, EV>>> neighbors = iteration
+		if(this.configuration != null) {
+			direction = this.configuration.getDirection();
+		}
+		DataSet<Tuple2<K, Neighbor<VV, EV>>> neighbors;
+		switch(direction) {
+			case OUT:
+				neighbors = iteration
+				.getWorkset().join(edgeDataSet)
+				.where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>());
+				break;
+			case IN:
+				neighbors = iteration
 				.getWorkset().join(edgeDataSet)
-				.where(0).equalTo(0).with(new ProjectKeyWithNeighbor<K, VV, EV>());
+				.where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>());
+				break;
+			case ALL:
+				neighbors =  iteration
+						.getWorkset().join(edgeDataSet)
+						.where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>()).union(iteration
+								.getWorkset().join(edgeDataSet)
+								.where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>()));
+				break;
+			default:
+				neighbors = iteration
+						.getWorkset().join(edgeDataSet)
+						.where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>());
+				break;
+		}
 
 		// Gather, sum and apply
 		MapOperator<Tuple2<K, Neighbor<VV, EV>>, Tuple2<K, M>> gatherMapOperator = neighbors.map(gatherUdf);
@@ -358,7 +385,7 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 
 	@SuppressWarnings("serial")
 	@ForwardedFieldsSecond("f1->f0")
-	private static final class ProjectKeyWithNeighbor<K, VV, EV> implements FlatJoinFunction<
+	private static final class ProjectKeyWithNeighborOUT<K, VV, EV> implements FlatJoinFunction<
 			Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
 
 		public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
@@ -367,6 +394,20 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 		}
 	}
 
+	@SuppressWarnings("serial")
+	@ForwardedFieldsSecond({"f0"})
+	private static final class ProjectKeyWithNeighborIN<K, VV, EV> implements FlatJoinFunction<
+			Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
+
+		public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
+			out.collect(new Tuple2<K, Neighbor<VV, EV>>(
+					edge.getSource(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
+		}
+	}
+
+
+
+
 	/**
 	 * Configures this gather-sum-apply iteration with the provided parameters.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/9350264b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
index 701eda9..53455c5 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.gsa.ApplyFunction;
@@ -38,6 +40,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.HashSet;
 import java.util.List;
 
 @RunWith(Parameterized.class)
@@ -118,6 +121,117 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 		compareResultAsTuples(result, expectedResult);
 	}
 
+	@Test
+	public void testIterationDefaultDirection() throws Exception {
+
+		/*
+		 * Test that if no direction parameter is given, the iteration works as before
+		 * (i.e. it gathers information from the IN edges and neighbors and the information is calculated for an OUT edge
+		 * Default direction parameter is OUT for the GatherSumApplyIterations)
+		 * When data is gathered from the IN edges the Gather Sum and Apply functions
+		 * set the set of vertices which have path to a vertex as the value of that vertex
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		List<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdges();
+
+		edges.remove(0);
+
+		Graph<Long, HashSet<Long>, Long> graph = Graph
+				.fromCollection(TestGraphUtils.getLongLongVertices(), edges, env)
+				.mapVertices(new GatherSumApplyConfigurationITCase.InitialiseHashSetMapper());
+
+		DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph.runGatherSumApplyIteration(
+				new GetReachableVertices(), new FindAllReachableVertices(), new UpdateReachableVertices(), 4)
+				.getVertices();
+
+		List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+
+		expectedResult = "1,[1, 2, 3, 4, 5]\n"
+						+"2,[2]\n"
+						+"3,[1, 2, 3, 4, 5]\n"
+						+"4,[1, 2, 3, 4, 5]\n"
+						+"5,[1, 2, 3, 4, 5]\n";
+
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testIterationDirectionIN() throws Exception {
+
+		/*
+		 * Test that if the direction parameter IN is given, the iteration works as expected
+		 * (i.e. it gathers information from the OUT edges and neighbors and the information is calculated for an IN edge
+		 * When data is gathered from the OUT edges the Gather Sum and Apply functions
+		 * set the set of vertices which have path from a vertex as the value of that vertex
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		GSAConfiguration parameters = new GSAConfiguration();
+
+		parameters.setDirection(EdgeDirection.IN);
+
+		List<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdges();
+
+		edges.remove(0);
+
+		Graph<Long, HashSet<Long>, Long> graph = Graph
+				.fromCollection(TestGraphUtils.getLongLongVertices(), edges, env)
+				.mapVertices(new GatherSumApplyConfigurationITCase.InitialiseHashSetMapper());
+
+		DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph.runGatherSumApplyIteration(
+				new GetReachableVertices(), new FindAllReachableVertices(), new UpdateReachableVertices(), 4,
+																								parameters)
+				.getVertices();
+		List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+
+		expectedResult = "1,[1, 3, 4, 5]\n"
+				+"2,[1, 2, 3, 4, 5]\n"
+				+"3,[1, 3, 4, 5]\n"
+				+"4,[1, 3, 4, 5]\n"
+				+"5,[1, 3, 4, 5]\n";
+
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testIterationDirectionALL() throws Exception {
+
+		/*
+		 * Test that if the direction parameter OUT is given, the iteration works as expected
+		 * (i.e. it gathers information from both IN and OUT edges and neighbors
+		 * When data is gathered from the ALL edges the Gather Sum and Apply functions
+		 * set the set of vertices which are connected to a Vertex through some path as value of that vertex
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		GSAConfiguration parameters = new GSAConfiguration();
+		parameters.setDirection(EdgeDirection.ALL);
+
+		List<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdges();
+
+		edges.remove(0);
+
+		Graph<Long, HashSet<Long>, Long> graph = Graph
+				.fromCollection(TestGraphUtils.getLongLongVertices(), edges, env)
+				.mapVertices(new GatherSumApplyConfigurationITCase.InitialiseHashSetMapper());
+
+		DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph.runGatherSumApplyIteration(
+				new GetReachableVertices(), new FindAllReachableVertices(), new UpdateReachableVertices(), 4,
+				parameters)
+				.getVertices();
+
+		List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+
+		expectedResult = "1,[1, 2, 3, 4, 5]\n"
+				+"2,[1, 2, 3, 4, 5]\n"
+				+"3,[1, 2, 3, 4, 5]\n"
+				+"4,[1, 2, 3, 4, 5]\n"
+				+"5,[1, 2, 3, 4, 5]\n";
+
+		compareResultAsTuples(result, expectedResult);
+	}
+
 	@SuppressWarnings("serial")
 	private static final class Gather extends GatherFunction<Long, Long, Long> {
 
@@ -243,4 +357,47 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 			return 1l;
 		}
 	}
+
+	@SuppressWarnings("serial")
+	public static final class InitialiseHashSetMapper implements MapFunction<Vertex<Long, Long>, HashSet<Long>> {
+
+		@Override
+		public HashSet<Long> map(Vertex<Long, Long> value) throws Exception {
+			HashSet<Long> h = new HashSet<Long>();
+			h.add(value.getId());
+			return h;
+		}
+	}
+
+	private static final class GetReachableVertices extends GatherFunction<HashSet<Long>, Long, HashSet<Long>> {
+
+		@Override
+		public HashSet<Long> gather(Neighbor<HashSet<Long>, Long> neighbor) {
+			return neighbor.getNeighborValue();
+		}
+	}
+
+	private static final class FindAllReachableVertices extends SumFunction<HashSet<Long>, Long, HashSet<Long>> {
+		@Override
+		public HashSet<Long> sum(HashSet<Long> newSet, HashSet<Long> currentSet) {
+			HashSet<Long> set = currentSet;
+			for(Long l : newSet) {
+				set.add(l);
+			}
+			return set;
+		}
+	}
+
+	private static final class UpdateReachableVertices extends ApplyFunction<Long, HashSet<Long>, HashSet<Long>> {
+
+		@Override
+		public void apply(HashSet<Long> newValue, HashSet<Long> currentValue) {
+			newValue.addAll(currentValue);
+			if(newValue.size()>currentValue.size()) {
+				setResult(newValue);
+			}
+		}
+	}
+
+
 }