You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2016/12/16 10:24:39 UTC

[1/2] flink git commit: [FLINK-5097][gelly] Add missing input type information to TypeExtractor in MapVertices, mapEdges, fromDataSet, and groupReduceOnEdges

Repository: flink
Updated Branches:
  refs/heads/master 4666e65ef -> 3d41f2b82


[FLINK-5097][gelly] Add missing input type information to TypeExtractor
in MapVertices, mapEdges, fromDataSet, and groupReduceOnEdges

This closes #2842


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88e458b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88e458b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88e458b4

Branch: refs/heads/master
Commit: 88e458b47c27d097c18674f1d5b9630349aeb129
Parents: 4666e65
Author: vasia <va...@apache.org>
Authored: Sat Nov 19 15:35:43 2016 +0100
Committer: vasia <va...@apache.org>
Committed: Fri Dec 16 10:36:49 2016 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java |  47 +++-----
 .../test/operations/TypeExtractorTest.java      | 119 +++++++++++++++++++
 2 files changed, 133 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/88e458b4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index c6843e4..0ee03c2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -214,7 +214,7 @@ public class Graph<K, VV, EV> {
 		TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
 
 		TypeInformation<VV> valueType = TypeExtractor.createTypeInfo(
-				MapFunction.class, vertexValueInitializer.getClass(), 1, null, null);
+				MapFunction.class, vertexValueInitializer.getClass(), 1, keyType, null);
 
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		TypeInformation<Vertex<K, VV>> returnType = (TypeInformation<Vertex<K, VV>>) new TupleTypeInfo(
@@ -529,7 +529,7 @@ public class Graph<K, VV, EV> {
 
 		TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0);
 
-		TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null);
+		TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, vertices.getType(), null);
 
 		TypeInformation<Vertex<K, NV>> returnType = (TypeInformation<Vertex<K, NV>>) new TupleTypeInfo(
 				Vertex.class, keyType, valueType);
@@ -573,7 +573,7 @@ public class Graph<K, VV, EV> {
 
 		TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
 
-		TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null);
+		TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, edges.getType(), null);
 
 		TypeInformation<Edge<K, NV>> returnType = (TypeInformation<Edge<K, NV>>) new TupleTypeInfo(
 				Edge.class, keyType, keyType, valueType);
@@ -1002,7 +1002,7 @@ public class Graph<K, VV, EV> {
 			return vertices.coGroup(edges).where(0).equalTo(0)
 					.with(new ApplyCoGroupFunction<>(edgesFunction)).name("GroupReduce on out-edges");
 		case ALL:
-			return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
+			return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, EV>())
 						.name("Emit edge"))
 					.where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction))
 						.name("GroupReduce on in- and out-edges");
@@ -1039,7 +1039,7 @@ public class Graph<K, VV, EV> {
 						.with(new ApplyCoGroupFunction<>(edgesFunction))
 							.name("GroupReduce on out-edges").returns(typeInfo);
 			case ALL:
-				return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
+				return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, EV>())
 							.name("Emit edge"))
 						.where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction))
 							.name("GroupReduce on in- and out-edges").returns(typeInfo);
@@ -1065,24 +1065,12 @@ public class Graph<K, VV, EV> {
 	public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
 			EdgeDirection direction) throws IllegalArgumentException {
 
-		switch (direction) {
-		case IN:
-			return edges.map(new ProjectVertexIdMap<K, EV>(1))
-					.withForwardedFields("f1->f0").name("Vertex ID")
-					.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
-						.name("GroupReduce on in-edges");
-		case OUT:
-			return edges.map(new ProjectVertexIdMap<K, EV>(0))
-					.withForwardedFields("f0").name("Vertex ID")
-					.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
-						.name("GroupReduce on out-edges");
-		case ALL:
-			return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()).name("Emit edge")
-				.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
-					.name("GroupReduce on in- and out-edges");
-		default:
-			throw new IllegalArgumentException("Illegal edge direction");
-		}
+		TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0);
+		TypeInformation<EV> edgeValueType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(2);
+		TypeInformation<T> returnType = TypeExtractor.createTypeInfo(EdgesFunction.class, edgesFunction.getClass(), 2,
+			keyType, edgeValueType);
+
+		return groupReduceOnEdges(edgesFunction, direction, returnType);
 	}
 
 	/**
@@ -1115,7 +1103,7 @@ public class Graph<K, VV, EV> {
 						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
 							.name("GroupReduce on out-edges").returns(typeInfo);
 			case ALL:
-				return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()).name("Emit edge")
+				return edges.flatMap(new EmitOneEdgePerNode<K, EV>()).name("Emit edge")
 						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
 							.name("GroupReduce on in- and out-edges").returns(typeInfo);
 			default:
@@ -1153,8 +1141,7 @@ public class Graph<K, VV, EV> {
 		}
 	}
 
-	private static final class ApplyGroupReduceFunction<K, EV, T> implements GroupReduceFunction<
-		Tuple2<K, Edge<K, EV>>, T>,	ResultTypeQueryable<T> {
+	private static final class ApplyGroupReduceFunction<K, EV, T> implements GroupReduceFunction<Tuple2<K, Edge<K, EV>>, T> {
 
 		private EdgesFunction<K, EV, T> function;
 
@@ -1165,14 +1152,9 @@ public class Graph<K, VV, EV> {
 		public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<T> out) throws Exception {
 			function.iterateEdges(edges, out);
 		}
-
-		@Override
-		public TypeInformation<T> getProducedType() {
-			return TypeExtractor.createTypeInfo(EdgesFunction.class, function.getClass(), 2, null, null);
-		}
 	}
 
-	private static final class EmitOneEdgePerNode<K, VV, EV> implements FlatMapFunction<
+	private static final class EmitOneEdgePerNode<K, EV> implements FlatMapFunction<
 		Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
 
 		public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, Edge<K, EV>>> out) {
@@ -1219,7 +1201,6 @@ public class Graph<K, VV, EV> {
 				throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
 			}
 		}
-
 		@Override
 		public TypeInformation<T> getProducedType() {
 			return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(), 3,

http://git-wip-us.apache.org/repos/asf/flink/blob/88e458b4/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java
new file mode 100644
index 0000000..484ef3d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.graph.*;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TypeExtractorTest {
+
+	private Graph<Long, Long, Long> inputGraph;
+	private DataSet<Vertex<Long, Long>> vertices;
+	private DataSet<Edge<Long, Long>> edges;
+	private ExecutionEnvironment env;
+
+	@Before
+	public void setUp() throws Exception {
+		env = ExecutionEnvironment.getExecutionEnvironment();
+		vertices = TestGraphUtils.getLongLongVertexData(env);
+		edges = TestGraphUtils.getLongLongEdgeData(env);
+		inputGraph = Graph.fromDataSet(vertices, edges, env);
+	}
+
+	@Test
+	public void testMapVerticesType() throws Exception {
+
+		// test type extraction in mapVertices
+		DataSet<Vertex<Long, Tuple2<Long, Integer>>> outVertices = inputGraph.mapVertices(new VertexMapper<Long>()).getVertices();
+		Assert.assertTrue(new TupleTypeInfo(Vertex.class, BasicTypeInfo.LONG_TYPE_INFO,
+			new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
+			.equals(outVertices.getType()));
+	}
+
+	@Test
+	public void testMapEdgesType() throws Exception {
+
+		// test type extraction in mapEdges
+		DataSet<Edge<Long, Tuple2<Long, Integer>>> outEdges = inputGraph.mapEdges(new EdgeMapper<Long>()).getEdges();
+		Assert.assertTrue(new TupleTypeInfo(Edge.class, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+			new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
+			.equals(outEdges.getType()));
+	}
+
+	@Test
+	public void testFromDataSet() throws Exception {
+		DataSet<Vertex<Long, Tuple2<Long, Integer>>> outVertices = Graph.fromDataSet(edges, new VertexInitializer<Long>(), env)
+			.getVertices();
+		Assert.assertTrue(new TupleTypeInfo(Vertex.class, BasicTypeInfo.LONG_TYPE_INFO,
+			new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
+			.equals(outVertices.getType()));
+	}
+
+	@Test
+	public void testGroupReduceOnEdges() throws Exception {
+		DataSet<Tuple2<Long, Long>> output = inputGraph.groupReduceOnEdges(new EdgesGroupFunction<Long, Long>(), EdgeDirection.OUT);
+		Assert.assertTrue((new TupleTypeInfo<Tuple2<Long, Long>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO)).equals(output.getType()));
+	}
+
+	public static final class VertexMapper<K> implements MapFunction<Vertex<K, Long>, Tuple2<K, Integer>> {
+
+		private final Tuple2<K, Integer> outTuple = new Tuple2<>();
+
+		@Override
+		public Tuple2<K, Integer> map(Vertex<K, Long> inputVertex) throws Exception {
+			return outTuple;
+		}
+	}
+
+	public static final class EdgeMapper<K> implements MapFunction<Edge<K, Long>, Tuple2<K, Integer>> {
+
+		private final Tuple2<K, Integer> outTuple = new Tuple2<>();
+
+		@Override
+		public Tuple2<K, Integer> map(Edge<K, Long> inputEdge) throws Exception {
+			return outTuple;
+		}
+	}
+
+	public static final class EdgesGroupFunction<K, EV> implements EdgesFunction<K, EV, Tuple2<K, EV>> {
+
+		@Override
+		public void iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<Tuple2<K, EV>> out) throws Exception {
+			out.collect(new Tuple2<K, EV>());
+		}
+	}
+
+	public static final class VertexInitializer<K> implements MapFunction<K, Tuple2<K, Integer>> {
+
+		@Override
+		public Tuple2<K, Integer> map(K value) throws Exception {
+			return null;
+		}
+	}
+}


[2/2] flink git commit: [FLINK-5311] [gelly] [docs] Add user documentation for bipartite graph

Posted by va...@apache.org.
[FLINK-5311] [gelly] [docs] Add user documentation for bipartite graph

This closes #2984


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d41f2b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d41f2b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d41f2b8

Branch: refs/heads/master
Commit: 3d41f2b821dc1c7b6496756f037338d2069f9639
Parents: 88e458b
Author: Ivan Mushketyk <iv...@gmail.com>
Authored: Sat Dec 10 17:41:30 2016 +0000
Committer: vasia <va...@apache.org>
Committed: Fri Dec 16 10:42:46 2016 +0100

----------------------------------------------------------------------
 docs/dev/libs/gelly/bipartite_graph.md   | 185 ++++++++++++++++++++++++++
 docs/dev/libs/gelly/index.md             |   1 +
 docs/fig/bipartite_graph_projections.png | Bin 0 -> 49335 bytes
 3 files changed, 186 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3d41f2b8/docs/dev/libs/gelly/bipartite_graph.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/bipartite_graph.md b/docs/dev/libs/gelly/bipartite_graph.md
new file mode 100644
index 0000000..ac57e3b
--- /dev/null
+++ b/docs/dev/libs/gelly/bipartite_graph.md
@@ -0,0 +1,185 @@
+---
+title: Bipartite Graph
+nav-parent_id: graphs
+nav-pos: 6
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-danger">Attention</span> Bipartite Graph currently only supported in Gelly Java API.
+
+* This will be replaced by the TOC
+{:toc}
+
+Bipartite Graph
+---------------
+
+A bipartite graph (also called a two-mode graph) is a type of graph where vertices are separated into two disjoint sets. These sets are usually called top and bottom vertices. An edge in this graph can only connect vertices from opposite sets (i.e. bottom vertex to top vertex) and cannot connect two vertices in the same set.
+
+These graphs have wide application in practice and can be a more natural choice for particular domains. For example to represent authorship of scientific papers top vertices can represent scientific papers while bottom nodes will represent authors. Naturally an edge between a top and a bottom nodes would represent an authorship of a particular scientific paper. Another common example for applications of bipartite graphs is relationships between actors and movies. In this case an edge represents that a particular actor played in a movie.
+
+Bipartite graphs are used instead of regular graphs (one-mode) for the following practical [reasons](http://www.complexnetworks.fr/wp-content/uploads/2011/01/socnet07.pdf):
+ * They preserve more information about a connection between vertices. For example instead of a single link between two researchers in a graph that represents that they authored a paper together a bipartite graph preserves the information about what papers they authored
+ * Bipartite graphs can encode the same information more compactly than one-mode graphs
+ 
+
+
+Graph Representation
+--------------------
+
+A `BipartiteGraph` is represented by:
+ * A `DataSet` of top nodes
+ * A `DataSet` of bottom nodes
+ * A `DataSet` of edges between top and bottom nodes
+
+As in the `Graph` class nodes are represented by the `Vertex` type and the same rules apply to its types and values.
+
+The graph edges are represented by the `BipartiteEdge` type. A `BipartiteEdge` is defined by a top ID (the ID of the top `Vertex`), a bottom ID (the ID of the bottom `Vertex`) and an optional value. The main difference between the `Edge` and `BipartiteEdge` is that IDs of nodes it links can be of different types. Edges with no value have a `NullValue` value type.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+BipartiteEdge<Long, String, Double> e = new BipartiteEdge<Long, String, Double>(1L, "id1", 0.5);
+
+Double weight = e.getValue(); // weight = 0.5
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// Scala API is not yet supported
+{% endhighlight %}
+</div>
+</div>
+{% top %}
+
+
+Graph Creation
+--------------
+
+You can create a `BipartiteGraph` in the following ways:
+
+* from a `DataSet` of top vertices, a `DataSet` of bottom vertices and a `DataSet` of edges:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+DataSet<Vertex<String, Long>> topVertices = ...
+
+DataSet<Vertex<String, Long>> bottomVertices = ...
+
+DataSet<Edge<String, String, Double>> edges = ...
+
+Graph<String, String, Long, Long, Double> graph = BipartiteGraph.fromDataSet(topVertices, bottomVertices, edges, env);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// Scala API is not yet supported
+{% endhighlight %}
+</div>
+</div>
+
+
+Graph Transformations
+---------------------
+
+
+* <strong>Projection</strong>: Projection is a common operation for bipartite graphs that converts a bipartite graph into a regular graph. There are two types of projections: top and bottom projections. Top projection preserves only top nodes in the result graph and creates a link between them in a new graph only if there is an intermediate bottom node both top nodes connect to in the original graph. Bottom projection is the opposite to top projection, i.e. only preserves bottom nodes and connects a pair of nodes if they are connected in the original graph.
+
+<p class="text-center">
+    <img alt="Bipartite Graph Projections" width="80%" src="{{ site.baseurl }}/fig/bipartite_graph_projections.png"/>
+</p>
+
+Gelly supports two sub-types of projections: simple projections and full projections. The only difference between them is what data is associated with edges in the result graph.
+
+In the case of a simple projection each node in the result graph contains a pair of values of bipartite edges that connect nodes in the original graph:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+// Vertices (1, "top1")
+DataSet<Vertex<Long, String>> topVertices = ...
+
+// Vertices (2, "bottom2"); (4, "bottom4")
+DataSet<Vertex<Long, String>> bottomVertices = ...
+
+// Edge that connect vertex 2 to vertex 1 and vertex 4 to vertex 1:
+// (1, 2, "1-2-edge"); (1, 4, "1-4-edge")
+DataSet<Edge<Long, Long, String>> edges = ...
+
+BipartiteGraph<Long, Long, String, String, String> graph = BipartiteGraph.fromDataSet(topVertices, bottomVertices, edges, env);
+
+// Result graph with two vertices:
+// (2, "bottom2"); (4, "bottom4")
+//
+// and one edge that contains ids of bottom edges and a tuple with
+// values of intermediate edges in the original bipartite graph:
+// (2, 4, ("1-2-edge", "1-4-edge"))
+Graph<Long, String, Tuple2<String, String>> graph bipartiteGraph.projectionBottomSimple();
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// Scala API is not yet supported
+{% endhighlight %}
+</div>
+</div>
+
+Full projection preserves all the information about the connection between two vertices and stores it in `Projection` instances. This includes value and id of an intermediate vertex, source and target vertex values and source and target edge values:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+// Vertices (1, "top1")
+DataSet<Vertex<Long, String>> topVertices = ...
+
+// Vertices (2, "bottom2"); (4, "bottom4")
+DataSet<Vertex<Long, String>> bottomVertices = ...
+
+// Edge that connect vertex 2 to vertex 1 and vertex 4 to vertex 1:
+// (1, 2, "1-2-edge"); (1, 4, "1-4-edge")
+DataSet<Edge<Long, Long, String>> edges = ...
+
+BipartiteGraph<Long, Long, String, String, String> graph = BipartiteGraph.fromDataSet(topVertices, bottomVertices, edges, env);
+
+// Result graph with two vertices:
+// (2, "bottom2"); (4, "bottom4")
+// and one edge that contains ids of bottom edges and a tuple that 
+// contains id and value of the intermediate edge, values of connected vertices
+// and values of intermediate edges in the original bipartite graph:
+// (2, 4, (1, "top1", "bottom2", "bottom4", "1-2-edge", "1-4-edge"))
+Graph<String, String, Projection<Long, String, String, String>> graph bipartiteGraph.projectionBottomFull();
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// Scala API is not yet supported
+{% endhighlight %}
+</div>
+</div>

http://git-wip-us.apache.org/repos/asf/flink/blob/3d41f2b8/docs/dev/libs/gelly/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/index.md b/docs/dev/libs/gelly/index.md
index 0877e2f..6bcdc82 100644
--- a/docs/dev/libs/gelly/index.md
+++ b/docs/dev/libs/gelly/index.md
@@ -33,6 +33,7 @@ Gelly is a Graph API for Flink. It contains a set of methods and utilities which
 * [Library Methods](library_methods.html)
 * [Graph Algorithms](graph_algorithms.html)
 * [Graph Generators](graph_generators.html)
+* [Bipartite Graphs](bipartite_graph.html)
 
 Using Gelly
 -----------

http://git-wip-us.apache.org/repos/asf/flink/blob/3d41f2b8/docs/fig/bipartite_graph_projections.png
----------------------------------------------------------------------
diff --git a/docs/fig/bipartite_graph_projections.png b/docs/fig/bipartite_graph_projections.png
new file mode 100644
index 0000000..e3be4ec
Binary files /dev/null and b/docs/fig/bipartite_graph_projections.png differ