You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/12/09 20:12:11 UTC

flink git commit: [FLINK-4646] [gelly] Add BipartiateGraph

Repository: flink
Updated Branches:
  refs/heads/master 9ab494a73 -> 365cd987c


[FLINK-4646] [gelly] Add BipartiateGraph

This closes #2564


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/365cd987
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/365cd987
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/365cd987

Branch: refs/heads/master
Commit: 365cd987cc90fa9b399acbb4fe0af3f995f604e3
Parents: 9ab494a
Author: Ivan Mushketyk <iv...@gmail.com>
Authored: Tue Sep 27 23:14:09 2016 +0100
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Dec 9 13:58:50 2016 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Edge.java  |  12 +-
 .../flink/graph/bipartite/BipartiteEdge.java    |  68 ++++
 .../flink/graph/bipartite/BipartiteGraph.java   | 319 +++++++++++++++++++
 .../flink/graph/bipartite/Projection.java       |  76 +++++
 .../graph/bipartite/BipartiteEdgeTest.java      |  70 ++++
 .../graph/bipartite/BipartiteGraphTest.java     | 146 +++++++++
 .../flink/graph/bipartite/ProjectionTest.java   |  76 +++++
 .../apache/flink/graph/generator/TestUtils.java |  11 +-
 .../apache/flink/test/util/TestBaseUtils.java   |  74 ++---
 9 files changed, 807 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
index 2bcce29..8e5f916 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
@@ -34,10 +34,10 @@ public class Edge<K, V> extends Tuple3<K, K, V>{
 
 	public Edge(){}
 
-	public Edge(K src, K trg, V val) {
-		this.f0 = src;
-		this.f1 = trg;
-		this.f2 = val;
+	public Edge(K source, K target, V value) {
+		this.f0 = source;
+		this.f1 = target;
+		this.f2 = value;
 	}
 
 	/**
@@ -49,8 +49,8 @@ public class Edge<K, V> extends Tuple3<K, K, V>{
 			return new Edge<>(this.f1, this.f0, this.f2);
 	}
 
-	public void setSource(K src) {
-		this.f0 = src;
+	public void setSource(K source) {
+		this.f0 = source;
 	}
 
 	public K getSource() {

http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteEdge.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteEdge.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteEdge.java
new file mode 100644
index 0000000..167e4ec
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteEdge.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.bipartite;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+
+/**
+ * A BipartiteEdge represents a link between top and bottom vertices
+ * in a {@link BipartiteGraph}. It is generalized form of {@link Edge}
+ * where the source and target vertex IDs can be of different types.
+ *
+ * @param <KT> the key type of the top vertices
+ * @param <KB> the key type of the bottom vertices
+ * @param <EV> the edge value type
+ */
+public class BipartiteEdge<KT, KB, EV> extends Tuple3<KT, KB, EV> {
+
+	private static final long serialVersionUID = 1L;
+
+	public BipartiteEdge() {}
+
+	public BipartiteEdge(KT topId, KB bottomId, EV value) {
+		this.f0 = topId;
+		this.f1 = bottomId;
+		this.f2 = value;
+	}
+
+	public KT getTopId() {
+		return this.f0;
+	}
+
+	public void setTopId(KT topId) {
+		this.f0 = topId;
+	}
+
+	public KB getBottomId() {
+		return this.f1;
+	}
+
+	public void setBottomId(KB bottomId) {
+		this.f1 = bottomId;
+	}
+
+	public EV getValue() {
+		return this.f2;
+	}
+
+	public void setValue(EV value) {
+		this.f2 = value;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
new file mode 100644
index 0000000..b325103
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.bipartite;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.Collector;
+
+/**
+ * The vertices of a bipartite graph are divided into two disjoint sets, referenced by the names "top" and "bottom".
+ * Top and bottom vertices with the same key value represent distinct entities and must be specially handled
+ * when projecting to a simple {@link Graph}. Edges can only exist between a pair of vertices from different vertices
+ * sets. E.g. there can be no vertices between a pair of top vertices.
+ *
+ * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by a particular author.
+ *
+ * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
+ * the projection methods.
+ *
+ * @param <KT> the key type of top vertices
+ * @param <KB> the key type of bottom vertices
+ * @param <VVT> the vertex value type of top vertices
+ * @param <VVB> the vertex value type of bottom vertices
+ * @param <EV> the edge value type
+ */
+public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
+
+	private final ExecutionEnvironment context;
+	private final DataSet<Vertex<KT, VVT>> topVertices;
+	private final DataSet<Vertex<KB, VVB>> bottomVertices;
+	private final DataSet<BipartiteEdge<KT, KB, EV>> edges;
+
+	private BipartiteGraph(
+			DataSet<Vertex<KT, VVT>> topVertices,
+			DataSet<Vertex<KB, VVB>> bottomVertices,
+			DataSet<BipartiteEdge<KT, KB, EV>> edges,
+			ExecutionEnvironment context) {
+		this.topVertices = topVertices;
+		this.bottomVertices = bottomVertices;
+		this.edges = edges;
+		this.context = context;
+	}
+
+	/**
+	 * Create bipartite graph from datasets.
+	 *
+	 * @param topVertices dataset of top vertices in the graph
+	 * @param bottomVertices dataset of bottom vertices in the graph
+	 * @param edges dataset of edges between vertices
+	 * @param context Flink execution context
+	 * @return new bipartite graph created from provided datasets
+	 */
+	public static <KT, KB, VVT, VVB, EV> BipartiteGraph<KT, KB, VVT, VVB, EV> fromDataSet(
+			DataSet<Vertex<KT, VVT>> topVertices,
+			DataSet<Vertex<KB, VVB>> bottomVertices,
+			DataSet<BipartiteEdge<KT, KB, EV>> edges,
+			ExecutionEnvironment context) {
+		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
+	}
+
+	/**
+	 * Get dataset with top vertices.
+	 *
+	 * @return dataset with top vertices
+	 */
+	public DataSet<Vertex<KT, VVT>> getTopVertices() {
+		return topVertices;
+	}
+
+	/**
+	 * Get dataset with bottom vertices.
+	 *
+	 * @return dataset with bottom vertices
+	 */
+	public DataSet<Vertex<KB, VVB>> getBottomVertices() {
+		return bottomVertices;
+	}
+
+	/**
+	 * Get dataset with graph edges.
+	 *
+	 * @return dataset with graph edges
+	 */
+	public DataSet<BipartiteEdge<KT, KB, EV>> getEdges() {
+		return edges;
+	}
+
+	/**
+	 * Convert a bipartite graph into an undirected graph that contains only top vertices. An edge between two vertices
+	 * in the new graph will exist only if the original bipartite graph contains a bottom vertex they are both
+	 * connected to.
+	 *
+	 * The simple projection performs a single join and returns edges containing the bipartite edge values.
+	 *
+	 * Note: KT must override .equals(). This requirement may be removed in a future release.
+	 *
+	 * @return simple top projection of the bipartite graph
+	 */
+	public Graph<KT, VVT, Tuple2<EV, EV>> projectionTopSimple() {
+		DataSet<Edge<KT, Tuple2<EV, EV>>> newEdges = edges.join(edges)
+			.where(1)
+			.equalTo(1)
+			.with(new ProjectionTopSimple<KT, KB, EV>())
+				.name("Simple top projection");
+
+		return Graph.fromDataSet(topVertices, newEdges, context);
+	}
+
+	@ForwardedFieldsFirst("0; 2->2.0")
+	@ForwardedFieldsSecond("0->1; 2->2.1")
+	private static class ProjectionTopSimple<KT, KB, EV>
+	implements FlatJoinFunction<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>, Edge<KT, Tuple2<EV, EV>>> {
+		private Tuple2<EV, EV> edgeValues = new Tuple2<>();
+
+		private Edge<KT, Tuple2<EV, EV>> edge = new Edge<>(null, null, edgeValues);
+
+		@Override
+		public void join(BipartiteEdge<KT, KB, EV> first, BipartiteEdge<KT, KB, EV> second, Collector<Edge<KT, Tuple2<EV, EV>>> out)
+				throws Exception {
+			if (!first.f0.equals(second.f0)) {
+				edge.f0 = first.f0;
+				edge.f1 = second.f0;
+
+				edgeValues.f0 = first.f2;
+				edgeValues.f1 = second.f2;
+
+				out.collect(edge);
+			}
+		}
+	}
+
+	/**
+	 * Convert a bipartite graph into an undirected graph that contains only bottom vertices. An edge between two
+	 * vertices in the new graph will exist only if the original bipartite graph contains a top vertex they are both
+	 * connected to.
+	 *
+	 * The simple projection performs a single join and returns edges containing the bipartite edge values.
+	 *
+	 * Note: KB must override .equals(). This requirement may be removed in a future release.
+	 *
+	 * @return simple bottom projection of the bipartite graph
+	 */
+	public Graph<KB, VVB, Tuple2<EV, EV>> projectionBottomSimple() {
+		DataSet<Edge<KB, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
+			.where(0)
+			.equalTo(0)
+			.with(new ProjectionBottomSimple<KT, KB, EV>())
+			.name("Simple bottom projection");
+
+		return Graph.fromDataSet(bottomVertices, newEdges, context);
+	}
+
+	@ForwardedFieldsFirst("1->0; 2->2.0")
+	@ForwardedFieldsSecond("1; 2->2.1")
+	private static class ProjectionBottomSimple<KT, KB, EV>
+	implements FlatJoinFunction<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>, Edge<KB, Tuple2<EV, EV>>> {
+		private Tuple2<EV, EV> edgeValues = new Tuple2<>();
+
+		private Edge<KB, Tuple2<EV, EV>> edge = new Edge<>(null, null, edgeValues);
+
+		@Override
+		public void join(BipartiteEdge<KT, KB, EV> first, BipartiteEdge<KT, KB, EV> second, Collector<Edge<KB, Tuple2<EV, EV>>> out)
+				throws Exception {
+			if (!first.f1.equals(second.f1)) {
+				edge.f0 = first.f1;
+				edge.f1 = second.f1;
+
+				edgeValues.f0 = first.f2;
+				edgeValues.f1 = second.f2;
+
+				out.collect(edge);
+			}
+		}
+	}
+
+	/**
+	 * Convert a bipartite graph into a graph that contains only top vertices. An edge between two vertices in the new
+	 * graph will exist only if the original bipartite graph contains at least one bottom vertex they both connect to.
+	 *
+	 * The full projection performs three joins and returns edges containing the the connecting vertex ID and value,
+	 * both top vertex values, and both bipartite edge values.
+	 *
+	 * Note: KT must override .equals(). This requirement may be removed in a future release.
+	 *
+	 * @return full top projection of the bipartite graph
+	 */
+	public Graph<KT, VVT, Projection<KB, VVB, VVT, EV>> projectionTopFull() {
+		DataSet<Tuple5<KT, KB, EV, VVT, VVB>> edgesWithVertices	= joinEdgeWithVertices();
+
+		DataSet<Edge<KT, Projection<KB, VVB, VVT, EV>>> newEdges = edgesWithVertices.join(edgesWithVertices)
+			.where(1)
+			.equalTo(1)
+			.with(new ProjectionTopFull<KT, KB, EV, VVT, VVB>())
+				.name("Full top projection");
+
+		return Graph.fromDataSet(topVertices, newEdges, context);
+	}
+
+	private DataSet<Tuple5<KT, KB, EV, VVT, VVB>> joinEdgeWithVertices() {
+		return edges
+			.join(topVertices, JoinHint.REPARTITION_HASH_SECOND)
+			.where(0)
+			.equalTo(0)
+			.projectFirst(0, 1, 2)
+			.<Tuple4<KT, KB, EV, VVT>>projectSecond(1)
+				.name("Edge with vertex")
+			.join(bottomVertices, JoinHint.REPARTITION_HASH_SECOND)
+			.where(1)
+			.equalTo(0)
+			.projectFirst(0, 1, 2, 3)
+			.<Tuple5<KT, KB, EV, VVT, VVB>>projectSecond(1)
+				.name("Edge with vertices");
+	}
+
+	@ForwardedFieldsFirst("0; 1->2.0; 2->2.4; 3->2.2; 4->2.1")
+	@ForwardedFieldsSecond("0->1; 2->2.5; 3->2.3")
+	private static class ProjectionTopFull<KT, KB, EV, VVT, VVB>
+	implements FlatJoinFunction<Tuple5<KT, KB, EV, VVT, VVB>, Tuple5<KT, KB, EV, VVT, VVB>, Edge<KT, Projection<KB, VVB, VVT, EV>>> {
+		private Projection<KB, VVB, VVT, EV> projection = new Projection<>();
+
+		private Edge<KT, Projection<KB, VVB, VVT, EV>> edge = new Edge<>(null, null, projection);
+
+		@Override
+		public void join(Tuple5<KT, KB, EV, VVT, VVB> first, Tuple5<KT, KB, EV, VVT, VVB> second, Collector<Edge<KT, Projection<KB, VVB, VVT, EV>>> out)
+				throws Exception {
+			if (!first.f0.equals(second.f0)) {
+				edge.f0 = first.f0;
+				edge.f1 = second.f0;
+
+				projection.f0 = first.f1;
+				projection.f1 = first.f4;
+				projection.f2 = first.f3;
+				projection.f3 = second.f3;
+				projection.f4 = first.f2;
+				projection.f5 = second.f2;
+
+				out.collect(edge);
+			}
+		}
+	}
+
+	/**
+	 * Convert a bipartite graph into a graph that contains only bottom vertices. An edge between two vertices in the
+	 * new graph will exist only if the original bipartite graph contains at least one top vertex they both connect to.
+	 *
+	 * The full projection performs three joins and returns edges containing the the connecting vertex ID and value,
+	 * both bottom vertex values, and both bipartite edge values.
+	 *
+	 * Note: KB must override .equals(). This requirement may be removed in a future release.
+	 *
+	 * @return full bottom projection of the bipartite graph
+	 */
+	public Graph<KB, VVB, Projection<KT, VVT, VVB, EV>> projectionBottomFull() {
+		DataSet<Tuple5<KT, KB, EV, VVT, VVB>> edgesWithVertices	= joinEdgeWithVertices();
+
+		DataSet<Edge<KB, Projection<KT, VVT, VVB, EV>>> newEdges = edgesWithVertices.join(edgesWithVertices)
+			.where(0)
+			.equalTo(0)
+			.with(new ProjectionBottomFull<KT, KB, EV, VVT, VVB>())
+				.name("Full bottom projection");
+
+		return Graph.fromDataSet(bottomVertices, newEdges, context);
+	}
+
+	@ForwardedFieldsFirst("1->0; 2->2.4; 3->2.1; 4->2.2")
+	@ForwardedFieldsSecond("1; 2->2.5; 4->2.3")
+	private static class ProjectionBottomFull<KT, KB, EV, VVT, VVB>
+	implements FlatJoinFunction<Tuple5<KT, KB, EV, VVT, VVB>, Tuple5<KT, KB, EV, VVT, VVB>, Edge<KB, Projection<KT, VVT, VVB, EV>>> {
+		private Projection<KT, VVT, VVB, EV> projection = new Projection<>();
+
+		private Edge<KB, Projection<KT, VVT, VVB, EV>> edge = new Edge<>(null, null, projection);
+
+		@Override
+		public void join(Tuple5<KT, KB, EV, VVT, VVB> first, Tuple5<KT, KB, EV, VVT, VVB> second, Collector<Edge<KB, Projection<KT, VVT, VVB, EV>>> out)
+			throws Exception {
+			if (!first.f1.equals(second.f1)) {
+				edge.f0 = first.f1;
+				edge.f1 = second.f1;
+
+				projection.f0 = first.f0;
+				projection.f1 = first.f3;
+				projection.f2 = first.f4;
+				projection.f3 = second.f4;
+				projection.f4 = first.f2;
+				projection.f5 = second.f2;
+
+				out.collect(edge);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/Projection.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/Projection.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/Projection.java
new file mode 100644
index 0000000..95a9cf6
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/Projection.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.bipartite;
+
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.graph.Vertex;
+
+/**
+ * The edge value of a full bipartite projection contains:
+ * <ul>
+ *     <li>the ID and vertex value of the connecting vertex</li>
+ *     <li>the vertex value for the source and target vertex</li>
+ *     <li>both edge values from the bipartite edges</li>
+ * </ul>
+ *
+ * @param <KC> the key type of connecting vertices
+ * @param <VVC> the vertex value type of connecting vertices
+ * @param <VV> the vertex value type of top or bottom vertices
+ * @param <EV> the edge value type from bipartite edges
+  */
+public class Projection<KC, VVC, VV, EV> extends Tuple6<KC, VVC, VV, VV, EV, EV> {
+
+	public Projection() {}
+
+	public Projection(
+			Vertex<KC, VVC> connectingVertex,
+			VV sourceVertexValue, VV targetVertexValue,
+			EV sourceEdgeValue, EV targetEdgeValue) {
+		this.f0 = connectingVertex.getId();
+		this.f1 = connectingVertex.getValue();
+		this.f2 = sourceVertexValue;
+		this.f3 = targetVertexValue;
+		this.f4 = sourceEdgeValue;
+		this.f5 = targetEdgeValue;
+	}
+
+	public KC getIntermediateVertexId() {
+		return this.f0;
+	}
+
+	public VVC getIntermediateVertexValue() {
+		return this.f1;
+	}
+
+	public VV getsSourceVertexValue() {
+		return this.f2;
+	}
+
+	public VV getTargetVertexValue() {
+		return this.f3;
+	}
+
+	public EV getSourceEdgeValue() {
+		return this.f4;
+	}
+
+	public EV getTargetEdgeValue() {
+		return this.f5;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteEdgeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteEdgeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteEdgeTest.java
new file mode 100644
index 0000000..ad0106b
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteEdgeTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.bipartite;
+
+import org.apache.flink.graph.bipartite.BipartiteEdge;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class BipartiteEdgeTest {
+
+	private static final int BOTTOM_ID = 0;
+	private static final int TOP_ID = 1;
+	private static final String VALUE = "value";
+
+	private final BipartiteEdge<Integer, Integer, String> edge = createEdge();
+
+	@Test
+	public void testGetBottomId() {
+		assertEquals(BOTTOM_ID, (long) edge.getBottomId());
+	}
+
+	@Test
+	public void testGetTopId() {
+		assertEquals(TOP_ID, (long) edge.getTopId());
+	}
+
+	@Test
+	public void testGetValue() {
+		assertEquals(VALUE, edge.getValue());
+	}
+
+	@Test
+	public void testSetBottomId() {
+		edge.setBottomId(100);
+		assertEquals(100, (long) edge.getBottomId());
+	}
+
+	@Test
+	public void testSetTopId() {
+		edge.setTopId(100);
+		assertEquals(100, (long) edge.getTopId());
+	}
+
+	@Test
+	public void testSetValue() {
+		edge.setValue("newVal");
+		assertEquals("newVal", edge.getValue());
+	}
+
+	private BipartiteEdge<Integer, Integer, String> createEdge() {
+		return new BipartiteEdge<>(TOP_ID, BOTTOM_ID, VALUE);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteGraphTest.java
new file mode 100644
index 0000000..366cf8e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteGraphTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.bipartite;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.apache.flink.graph.generator.TestUtils.compareGraph;
+import static org.junit.Assert.assertEquals;
+
+public class BipartiteGraphTest {
+
+	@Test
+	public void testGetTopVertices() throws Exception {
+		BipartiteGraph<Integer, Integer, String, String, String> bipartiteGraph = createBipartiteGraph();
+
+		assertEquals(
+			Arrays.asList(
+				new Vertex<>(4, "top4"),
+				new Vertex<>(5, "top5"),
+				new Vertex<>(6, "top6")),
+			bipartiteGraph.getTopVertices().collect());
+	}
+
+	@Test
+	public void testGetBottomVertices() throws Exception {
+		BipartiteGraph<Integer, Integer, String, String, String> bipartiteGraph = createBipartiteGraph();
+
+		assertEquals(
+			Arrays.asList(
+				new Vertex<>(1, "bottom1"),
+				new Vertex<>(2, "bottom2"),
+				new Vertex<>(3, "bottom3")),
+			bipartiteGraph.getBottomVertices().collect());
+	}
+
+	@Test
+	public void testSimpleTopProjection() throws Exception {
+		BipartiteGraph<Integer, Integer, String, String, String> bipartiteGraph = createBipartiteGraph();
+		Graph<Integer, String, Tuple2<String, String>> graph = bipartiteGraph.projectionTopSimple();
+
+		compareGraph(graph, "4; 5; 6", "5,4; 4,5; 5,6; 6,5");
+
+		String expected =
+			"(5,4,(5-1,4-1))\n" +
+			"(4,5,(4-1,5-1))\n" +
+			"(6,5,(6-2,5-2))\n" +
+			"(5,6,(5-2,6-2))";
+		TestBaseUtils.compareResultAsText(graph.getEdges().collect(), expected);
+	}
+
+	@Test
+	public void testSimpleBottomProjection() throws Exception {
+		BipartiteGraph<Integer, Integer, String, String, String> bipartiteGraph = createBipartiteGraph();
+		Graph<Integer, String, Tuple2<String, String>> graph = bipartiteGraph.projectionBottomSimple();
+
+		compareGraph(graph, "1; 2; 3", "1,2; 2,1; 2,3; 3,2");
+
+		String expected =
+			"(3,2,(6-3,6-2))\n" +
+			"(2,3,(6-2,6-3))\n" +
+			"(2,1,(5-2,5-1))\n" +
+			"(1,2,(5-1,5-2))";
+		TestBaseUtils.compareResultAsText(graph.getEdges().collect(), expected);
+	}
+
+	@Test
+	public void testFullTopProjection() throws Exception {
+		BipartiteGraph<Integer, Integer, String, String, String> bipartiteGraph = createBipartiteGraph();
+		Graph<Integer, String, Projection<Integer, String, String, String>> graph = bipartiteGraph.projectionTopFull();
+
+		graph.getEdges().print();
+		compareGraph(graph, "4; 5; 6", "5,4; 4,5; 5,6; 6,5");
+
+		String expected =
+			"(5,4,(1,bottom1,top5,top4,5-1,4-1))\n" +
+			"(4,5,(1,bottom1,top4,top5,4-1,5-1))\n" +
+			"(6,5,(2,bottom2,top6,top5,6-2,5-2))\n" +
+			"(5,6,(2,bottom2,top5,top6,5-2,6-2))";
+		TestBaseUtils.compareResultAsText(graph.getEdges().collect(), expected);
+	}
+
+	@Test
+	public void testFullBottomProjection() throws Exception {
+		BipartiteGraph<Integer, Integer, String, String, String> bipartiteGraph = createBipartiteGraph();
+		Graph<Integer, String, Projection<Integer, String, String, String>> graph = bipartiteGraph.projectionBottomFull();
+
+		compareGraph(graph, "1; 2; 3", "1,2; 2,1; 2,3; 3,2");
+
+		String expected =
+			"(3,2,(6,top6,bottom3,bottom2,6-3,6-2))\n" +
+			"(2,3,(6,top6,bottom2,bottom3,6-2,6-3))\n" +
+			"(2,1,(5,top5,bottom2,bottom1,5-2,5-1))\n" +
+			"(1,2,(5,top5,bottom1,bottom2,5-1,5-2))";
+		TestBaseUtils.compareResultAsText(graph.getEdges().collect(), expected);
+	}
+
+	private BipartiteGraph<Integer, Integer, String, String, String> createBipartiteGraph() {
+		ExecutionEnvironment executionEnvironment = ExecutionEnvironment.createCollectionsEnvironment();
+
+		DataSet<Vertex<Integer, String>> topVertices = executionEnvironment.fromCollection(Arrays.asList(
+			new Vertex<>(4, "top4"),
+			new Vertex<>(5, "top5"),
+			new Vertex<>(6, "top6")
+		));
+
+		DataSet<Vertex<Integer, String>> bottomVertices = executionEnvironment.fromCollection(Arrays.asList(
+			new Vertex<>(1, "bottom1"),
+			new Vertex<>(2, "bottom2"),
+			new Vertex<>(3, "bottom3")
+		));
+
+		DataSet<BipartiteEdge<Integer, Integer, String>> edges = executionEnvironment.fromCollection(Arrays.asList(
+			new BipartiteEdge<>(4, 1, "4-1"),
+			new BipartiteEdge<>(5, 1, "5-1"),
+			new BipartiteEdge<>(5, 2, "5-2"),
+			new BipartiteEdge<>(6, 2, "6-2"),
+			new BipartiteEdge<>(6, 3, "6-3")
+		));
+
+		return BipartiteGraph.fromDataSet(topVertices, bottomVertices, edges, executionEnvironment);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/ProjectionTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/ProjectionTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/ProjectionTest.java
new file mode 100644
index 0000000..3aafe64
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/ProjectionTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.bipartite;
+
+import org.apache.flink.graph.Vertex;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ProjectionTest {
+
+	private static final int ID = 10;
+
+	private static final String VERTEX_VALUE = "vertex-value";
+	private static final String SOURCE_EDGE_VALUE = "source-edge-value";
+	private static final String TARGET_EDGE_VALUE = "target-edge-value";
+	private static final String SOURCE_VERTEX_VALUE = "source-vertex-value";
+	private static final String TARGET_VERTEX_VALUE = "target-vertex-value";
+
+	private Projection<Integer, String, String, String> projection = createProjection();
+
+	@Test
+	public void testIntermediateVertexGetId() {
+		assertEquals(Integer.valueOf(ID), projection.getIntermediateVertexId());
+	}
+
+	@Test
+	public void testGetIntermediateVertexValue() {
+		assertEquals(VERTEX_VALUE, projection.getIntermediateVertexValue());
+	}
+
+	@Test
+	public void testGetSourceEdgeValue() {
+		assertEquals(SOURCE_EDGE_VALUE, projection.getSourceEdgeValue());
+	}
+
+	@Test
+	public void testGetTargetEdgeValue() {
+		assertEquals(TARGET_EDGE_VALUE, projection.getTargetEdgeValue());
+	}
+
+	@Test
+	public void testGetSourceVertexValue() {
+		assertEquals(SOURCE_VERTEX_VALUE, projection.getsSourceVertexValue());
+	}
+
+	@Test
+	public void testGetTargetVertexValue() {
+		assertEquals(TARGET_VERTEX_VALUE, projection.getTargetVertexValue());
+	}
+
+	private Projection<Integer, String, String, String> createProjection() {
+		return new Projection<>(
+			new Vertex<>(ID, VERTEX_VALUE),
+			SOURCE_VERTEX_VALUE,
+			TARGET_VERTEX_VALUE,
+			SOURCE_EDGE_VALUE,
+			TARGET_EDGE_VALUE);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
index 3ea5a44..a302a30 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
@@ -50,7 +50,12 @@ public final class TestUtils {
 	 */
 	public static <K,VV,EV> void compareGraph(Graph<K,VV,EV> graph, String expectedVertices, String expectedEdges)
 			throws Exception {
-		// Vertices
+		compareVertices(graph, expectedVertices);
+		compareEdges(graph, expectedEdges);
+	}
+
+	private static <K, VV, EV> void compareVertices(Graph<K, VV, EV> graph, String expectedVertices)
+			throws Exception {
 		if (expectedVertices != null) {
 			List<String> resultVertices = new ArrayList<>();
 
@@ -60,8 +65,10 @@ public final class TestUtils {
 
 			TestBaseUtils.compareResultAsText(resultVertices, expectedVertices.replaceAll("\\s","").replace(";", "\n"));
 		}
+	}
 
-		// Edges
+	private static <K, VV, EV> void compareEdges(Graph<K, VV, EV> graph, String expectedEdges)
+			throws Exception {
 		if (expectedEdges != null) {
 			List<String> resultEdges = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 804b3d4..5e15076 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -18,17 +18,12 @@
 
 package org.apache.flink.test.util;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 import akka.actor.ActorRef;
 import akka.dispatch.Futures;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -37,14 +32,10 @@ import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
-
 import org.apache.hadoop.fs.FileSystem;
-
 import org.junit.Assert;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.concurrent.Await;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.ExecutionContext$;
@@ -77,6 +68,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class TestBaseUtils extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(TestBaseUtils.class);
@@ -92,32 +86,32 @@ public class TestBaseUtils extends TestLogger {
 	public static FiniteDuration DEFAULT_TIMEOUT = new FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
 
 	// ------------------------------------------------------------------------
-	
+
 	protected static File logDir;
 
 	protected TestBaseUtils(){
 		verifyJvmOptions();
 	}
-	
+
 	private static void verifyJvmOptions() {
 		long heap = Runtime.getRuntime().maxMemory() >> 20;
 		Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
 				+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
 	}
-	
-	
+
+
 	public static LocalFlinkMiniCluster startCluster(
 		int numTaskManagers,
 		int taskManagerNumSlots,
 		boolean startWebserver,
 		boolean startZooKeeper,
 		boolean singleActorSystem) throws Exception {
-		
+
 		Configuration config = new Configuration();
 
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
-		
+
 		config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, startWebserver);
 
 		if (startZooKeeper) {
@@ -146,7 +140,7 @@ public class TestBaseUtils extends TestLogger {
 
 		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
 		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
-		
+
 		config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString());
 
 		LocalFlinkMiniCluster cluster =  new LocalFlinkMiniCluster(config, singleActorSystem);
@@ -164,7 +158,7 @@ public class TestBaseUtils extends TestLogger {
 		if (executor != null) {
 			int numUnreleasedBCVars = 0;
 			int numActiveConnections = 0;
-			
+
 			if (executor.running()) {
 				List<ActorRef> tms = executor.getTaskManagersAsJava();
 				List<Future<Object>> bcVariableManagerResponseFutures = new ArrayList<>();
@@ -249,7 +243,7 @@ public class TestBaseUtils extends TestLogger {
 		}
 		return readers;
 	}
-	
+
 	public static BufferedInputStream[] getResultInputStream(String resultPath) throws IOException {
 		return getResultInputStream(resultPath, new String[]{});
 	}
@@ -268,13 +262,13 @@ public class TestBaseUtils extends TestLogger {
 		readAllResultLines(target, resultPath, new String[]{});
 	}
 
-	public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes) 
+	public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes)
 			throws IOException {
-		
+
 		readAllResultLines(target, resultPath, excludePrefixes, false);
 	}
 
-	public static void readAllResultLines(List<String> target, String resultPath, 
+	public static void readAllResultLines(List<String> target, String resultPath,
 											String[] excludePrefixes, boolean inOrderOfFiles) throws IOException {
 
 		final BufferedReader[] readers = getResultReader(resultPath, excludePrefixes, inOrderOfFiles);
@@ -453,14 +447,14 @@ public class TestBaseUtils extends TestLogger {
 	public static <T> void compareOrderedResultAsText(List<T> result, String expected, boolean asTuples) {
 		compareResult(result, expected, asTuples, false);
 	}
-	
+
 	private static <T> void compareResult(List<T> result, String expected, boolean asTuples, boolean sort) {
 		String[] expectedStrings = expected.split("\n");
 		String[] resultStrings = new String[result.size()];
-		
+
 		for (int i = 0; i < resultStrings.length; i++) {
 			T val = result.get(i);
-			
+
 			if (asTuples) {
 				if (val instanceof Tuple) {
 					Tuple t = (Tuple) val;
@@ -480,19 +474,25 @@ public class TestBaseUtils extends TestLogger {
 				resultStrings[i] = (val == null) ? "null" : val.toString();
 			}
 		}
-		
-		assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length);
 
 		if (sort) {
 			Arrays.sort(expectedStrings);
 			Arrays.sort(resultStrings);
 		}
-		
+
+		// Include content of both arrays to provide more context in case of a test failure
+		String msg = String.format(
+			"Different elements in arrays: expected %d elements and received %d\n expected: %s\n received: %s",
+			expectedStrings.length, resultStrings.length,
+			Arrays.toString(expectedStrings), Arrays.toString(resultStrings));
+
+		assertEquals(msg, expectedStrings.length, resultStrings.length);
+
 		for (int i = 0; i < expectedStrings.length; i++) {
-			assertEquals(expectedStrings[i], resultStrings[i]);
+			assertEquals(msg, expectedStrings[i], resultStrings[i]);
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	// Comparison methods for tests using sample
 	// --------------------------------------------------------------------------------------------
@@ -523,7 +523,7 @@ public class TestBaseUtils extends TestLogger {
 	// --------------------------------------------------------------------------------------------
 	//  Miscellaneous helper methods
 	// --------------------------------------------------------------------------------------------
-	
+
 	protected static Collection<Object[]> toParameterList(Configuration ... testConfigs) {
 		ArrayList<Object[]> configs = new ArrayList<>();
 		for (Configuration testConfig : testConfigs) {
@@ -560,7 +560,7 @@ public class TestBaseUtils extends TestLogger {
 			System.err.println("Failed to delete file " + f.getAbsolutePath());
 		}
 	}
-	
+
 	public static String constructTestPath(Class<?> forClass, String folder) {
 		// we create test path that depends on class to prevent name clashes when two tests
 		// create temp files with the same name
@@ -571,7 +571,7 @@ public class TestBaseUtils extends TestLogger {
 		path += (forClass.getName() + "-" + folder);
 		return path;
 	}
-	
+
 	public static String constructTestURI(Class<?> forClass, String folder) {
 		return new File(constructTestPath(forClass, folder)).toURI().toString();
 	}
@@ -597,7 +597,7 @@ public class TestBaseUtils extends TestLogger {
 
 		return IOUtils.toString(is, connection.getContentEncoding() != null ? connection.getContentEncoding() : "UTF-8");
 	}
-	
+
 	public static class TupleComparator<T extends Tuple> implements Comparator<T> {
 
 		@Override
@@ -612,7 +612,7 @@ public class TestBaseUtils extends TestLogger {
 				for (int i = 0; i < o1.getArity(); i++) {
 					Object val1 = o1.getField(i);
 					Object val2 = o2.getField(i);
-					
+
 					int cmp;
 					if (val1 != null && val2 != null) {
 						cmp = compareValues(val1, val2);
@@ -620,16 +620,16 @@ public class TestBaseUtils extends TestLogger {
 					else {
 						cmp = val1 == null ? (val2 == null ? 0 : -1) : 1;
 					}
-					
+
 					if (cmp != 0) {
 						return cmp;
 					}
 				}
-				
+
 				return 0;
 			}
 		}
-		
+
 		@SuppressWarnings("unchecked")
 		private static <X extends Comparable<X>> int compareValues(Object o1, Object o2) {
 			if (o1 instanceof Comparable && o2 instanceof Comparable) {