You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by an...@apache.org on 2015/07/12 11:24:45 UTC
flink git commit: [FLINK-2141][gelly] Allow GSA's Gather to perform
this operation in IN, OUT or ALL directions
Repository: flink
Updated Branches:
refs/heads/master 3b69b2499 -> 9350264bd
[FLINK-2141][gelly] Allow GSA's Gather to perform this operation in IN, OUT or ALL directions
[FLINK-2141][gelly] Added Test cases for GSAConfiguration setDirection. Made changes in coding style
[FLINK-2141][gelly]Removed Example
[FLINK-2141][gelly] Corrected Annotation and Gelly guide
Minor Changes in Gelly Guide
This closes #877
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9350264b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9350264b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9350264b
Branch: refs/heads/master
Commit: 9350264bdda2b4a5e230c4621a612e38a0f77be1
Parents: 3b69b24
Author: Shivani <sh...@gmail.com>
Authored: Wed Jul 1 03:03:58 2015 +0200
Committer: andra <an...@apache.org>
Committed: Sun Jul 12 11:18:27 2015 +0200
----------------------------------------------------------------------
docs/libs/gelly_guide.md | 20 +++
.../flink/graph/gsa/GSAConfiguration.java | 23 +++
.../graph/gsa/GatherSumApplyIteration.java | 47 +++++-
.../test/GatherSumApplyConfigurationITCase.java | 157 +++++++++++++++++++
4 files changed, 244 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9350264b/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 1e9bf48..e0e05b3 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -693,6 +693,9 @@ Currently, the following parameters can be specified:
* <strong>Number of Vertices</strong>: Accessing the total number of vertices within the iteration. This property can be set using the `setOptNumVertices()` method.
The number of vertices can then be accessed in the gather, sum and/or apply functions by using the `getNumberOfVertices()` method. If the option is not set in the configuration, this method will return -1.
+* <strong>Neighbor Direction</strong>: By default values are gathered from the out neighbors of the Vertex. This can be modified
+using the `setDirection()` method.
+
The following example illustrates the usage of the number of vertices option.
{% highlight java %}
@@ -734,6 +737,23 @@ public static final class Apply {
{% endhighlight %}
+The following example illustrates the usage of the edge direction option.
+{% highlight java %}
+
+Graph<Long, HashSet<Long>, Double> graph = ...
+
+// configure the iteration
+GSAConfiguration parameters = new GSAConfiguration();
+
+// set the messaging direction
+parameters.setDirection(EdgeDirection.IN);
+
+// run the gather-sum-apply iteration, also passing the configuration parameters
+DataSet<Vertex<Long, HashSet<Long>>> result =
+ graph.runGatherSumApplyIteration(
+ new Gather(), new Sum(), new Apply(), maxIterations, parameters)
+ .getVertices();
+{% endhighlight %}
[Back to top](#top)
### Vertex-centric and GSA Comparison
http://git-wip-us.apache.org/repos/asf/flink/blob/9350264b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
index de47280..8d24f16 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.gsa;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.IterationConfiguration;
import java.util.ArrayList;
@@ -46,6 +47,8 @@ public class GSAConfiguration extends IterationConfiguration {
/** the broadcast variables for the apply function **/
private List<Tuple2<String, DataSet<?>>> bcVarsApply = new ArrayList<Tuple2<String,DataSet<?>>>();
+ private EdgeDirection direction = EdgeDirection.OUT;
+
public GSAConfiguration() {}
/**
@@ -107,4 +110,24 @@ public class GSAConfiguration extends IterationConfiguration {
public List<Tuple2<String, DataSet<?>>> getApplyBcastVars() {
return this.bcVarsApply;
}
+
+ /**
+ * Gets the direction from which the neighbors are to be selected
+ * By default the neighbors who are target of the edges are selected
+ *
+ * @return an EdgeDirection, which can be either IN, OUT or ALL.
+ */
+ public EdgeDirection getDirection() {
+ return direction;
+ }
+
+ /**
+ * Sets the direction in which neighbors are to be selected
+ * By default the neighbors who are target of the edges are selected
+ *
+ * @param direction - IN, OUT or ALL
+ */
+ public void setDirection(EdgeDirection direction) {
+ this.direction = direction;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9350264b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 389cf02..4c91089 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -39,6 +39,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.util.Collector;
@@ -64,6 +65,7 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
private final SumFunction<VV, EV, M> sum;
private final ApplyFunction<K, VV, M> apply;
private final int maximumNumberOfIterations;
+ private EdgeDirection direction = EdgeDirection.OUT;
private GSAConfiguration configuration;
@@ -163,9 +165,34 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
}
// Prepare the neighbors
- DataSet<Tuple2<K, Neighbor<VV, EV>>> neighbors = iteration
+ if(this.configuration != null) {
+ direction = this.configuration.getDirection();
+ }
+ DataSet<Tuple2<K, Neighbor<VV, EV>>> neighbors;
+ switch(direction) {
+ case OUT:
+ neighbors = iteration
+ .getWorkset().join(edgeDataSet)
+ .where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>());
+ break;
+ case IN:
+ neighbors = iteration
.getWorkset().join(edgeDataSet)
- .where(0).equalTo(0).with(new ProjectKeyWithNeighbor<K, VV, EV>());
+ .where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>());
+ break;
+ case ALL:
+ neighbors = iteration
+ .getWorkset().join(edgeDataSet)
+ .where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>()).union(iteration
+ .getWorkset().join(edgeDataSet)
+ .where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>()));
+ break;
+ default:
+ neighbors = iteration
+ .getWorkset().join(edgeDataSet)
+ .where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>());
+ break;
+ }
// Gather, sum and apply
MapOperator<Tuple2<K, Neighbor<VV, EV>>, Tuple2<K, M>> gatherMapOperator = neighbors.map(gatherUdf);
@@ -358,7 +385,7 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
@SuppressWarnings("serial")
@ForwardedFieldsSecond("f1->f0")
- private static final class ProjectKeyWithNeighbor<K, VV, EV> implements FlatJoinFunction<
+ private static final class ProjectKeyWithNeighborOUT<K, VV, EV> implements FlatJoinFunction<
Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
@@ -367,6 +394,20 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
}
}
+ @SuppressWarnings("serial")
+ @ForwardedFieldsSecond({"f0"})
+ private static final class ProjectKeyWithNeighborIN<K, VV, EV> implements FlatJoinFunction<
+ Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
+
+ public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
+ out.collect(new Tuple2<K, Neighbor<VV, EV>>(
+ edge.getSource(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
+ }
+ }
+
+
+
+
/**
* Configures this gather-sum-apply iteration with the provided parameters.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/9350264b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
index 701eda9..53455c5 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.gsa.ApplyFunction;
@@ -38,6 +40,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.util.HashSet;
import java.util.List;
@RunWith(Parameterized.class)
@@ -118,6 +121,117 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
compareResultAsTuples(result, expectedResult);
}
+ @Test
+ public void testIterationDefaultDirection() throws Exception {
+
+ /*
+ * Test that if no direction parameter is given, the iteration works as before
+ * (i.e. it gathers information from the IN edges and neighbors and the information is calculated for an OUT edge
+ * Default direction parameter is OUT for the GatherSumApplyIterations)
+ * When data is gathered from the IN edges the Gather Sum and Apply functions
+ * set the set of vertices which have path to a vertex as the value of that vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ List<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdges();
+
+ edges.remove(0);
+
+ Graph<Long, HashSet<Long>, Long> graph = Graph
+ .fromCollection(TestGraphUtils.getLongLongVertices(), edges, env)
+ .mapVertices(new GatherSumApplyConfigurationITCase.InitialiseHashSetMapper());
+
+ DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph.runGatherSumApplyIteration(
+ new GetReachableVertices(), new FindAllReachableVertices(), new UpdateReachableVertices(), 4)
+ .getVertices();
+
+ List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+
+ expectedResult = "1,[1, 2, 3, 4, 5]\n"
+ +"2,[2]\n"
+ +"3,[1, 2, 3, 4, 5]\n"
+ +"4,[1, 2, 3, 4, 5]\n"
+ +"5,[1, 2, 3, 4, 5]\n";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testIterationDirectionIN() throws Exception {
+
+ /*
+ * Test that if the direction parameter IN is given, the iteration works as expected
+ * (i.e. it gathers information from the OUT edges and neighbors and the information is calculated for an IN edge
+ * When data is gathered from the OUT edges the Gather Sum and Apply functions
+ * set the set of vertices which have path from a vertex as the value of that vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ GSAConfiguration parameters = new GSAConfiguration();
+
+ parameters.setDirection(EdgeDirection.IN);
+
+ List<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdges();
+
+ edges.remove(0);
+
+ Graph<Long, HashSet<Long>, Long> graph = Graph
+ .fromCollection(TestGraphUtils.getLongLongVertices(), edges, env)
+ .mapVertices(new GatherSumApplyConfigurationITCase.InitialiseHashSetMapper());
+
+ DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph.runGatherSumApplyIteration(
+ new GetReachableVertices(), new FindAllReachableVertices(), new UpdateReachableVertices(), 4,
+ parameters)
+ .getVertices();
+ List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+
+ expectedResult = "1,[1, 3, 4, 5]\n"
+ +"2,[1, 2, 3, 4, 5]\n"
+ +"3,[1, 3, 4, 5]\n"
+ +"4,[1, 3, 4, 5]\n"
+ +"5,[1, 3, 4, 5]\n";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testIterationDirectionALL() throws Exception {
+
+ /*
+ * Test that if the direction parameter OUT is given, the iteration works as expected
+ * (i.e. it gathers information from both IN and OUT edges and neighbors
+ * When data is gathered from the ALL edges the Gather Sum and Apply functions
+ * set the set of vertices which are connected to a Vertex through some path as value of that vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ GSAConfiguration parameters = new GSAConfiguration();
+ parameters.setDirection(EdgeDirection.ALL);
+
+ List<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdges();
+
+ edges.remove(0);
+
+ Graph<Long, HashSet<Long>, Long> graph = Graph
+ .fromCollection(TestGraphUtils.getLongLongVertices(), edges, env)
+ .mapVertices(new GatherSumApplyConfigurationITCase.InitialiseHashSetMapper());
+
+ DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph.runGatherSumApplyIteration(
+ new GetReachableVertices(), new FindAllReachableVertices(), new UpdateReachableVertices(), 4,
+ parameters)
+ .getVertices();
+
+ List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+
+ expectedResult = "1,[1, 2, 3, 4, 5]\n"
+ +"2,[1, 2, 3, 4, 5]\n"
+ +"3,[1, 2, 3, 4, 5]\n"
+ +"4,[1, 2, 3, 4, 5]\n"
+ +"5,[1, 2, 3, 4, 5]\n";
+
+ compareResultAsTuples(result, expectedResult);
+ }
+
@SuppressWarnings("serial")
private static final class Gather extends GatherFunction<Long, Long, Long> {
@@ -243,4 +357,47 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
return 1l;
}
}
+
+ @SuppressWarnings("serial")
+ public static final class InitialiseHashSetMapper implements MapFunction<Vertex<Long, Long>, HashSet<Long>> {
+
+ @Override
+ public HashSet<Long> map(Vertex<Long, Long> value) throws Exception {
+ HashSet<Long> h = new HashSet<Long>();
+ h.add(value.getId());
+ return h;
+ }
+ }
+
+ private static final class GetReachableVertices extends GatherFunction<HashSet<Long>, Long, HashSet<Long>> {
+
+ @Override
+ public HashSet<Long> gather(Neighbor<HashSet<Long>, Long> neighbor) {
+ return neighbor.getNeighborValue();
+ }
+ }
+
+ private static final class FindAllReachableVertices extends SumFunction<HashSet<Long>, Long, HashSet<Long>> {
+ @Override
+ public HashSet<Long> sum(HashSet<Long> newSet, HashSet<Long> currentSet) {
+ HashSet<Long> set = currentSet;
+ for(Long l : newSet) {
+ set.add(l);
+ }
+ return set;
+ }
+ }
+
+ private static final class UpdateReachableVertices extends ApplyFunction<Long, HashSet<Long>, HashSet<Long>> {
+
+ @Override
+ public void apply(HashSet<Long> newValue, HashSet<Long> currentValue) {
+ newValue.addAll(currentValue);
+ if(newValue.size()>currentValue.size()) {
+ setResult(newValue);
+ }
+ }
+ }
+
+
}