You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/12/09 20:12:11 UTC
flink git commit: [FLINK-4646] [gelly] Add BipartiateGraph
Repository: flink
Updated Branches:
refs/heads/master 9ab494a73 -> 365cd987c
[FLINK-4646] [gelly] Add BipartiateGraph
This closes #2564
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/365cd987
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/365cd987
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/365cd987
Branch: refs/heads/master
Commit: 365cd987cc90fa9b399acbb4fe0af3f995f604e3
Parents: 9ab494a
Author: Ivan Mushketyk <iv...@gmail.com>
Authored: Tue Sep 27 23:14:09 2016 +0100
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Dec 9 13:58:50 2016 -0500
----------------------------------------------------------------------
.../main/java/org/apache/flink/graph/Edge.java | 12 +-
.../flink/graph/bipartite/BipartiteEdge.java | 68 ++++
.../flink/graph/bipartite/BipartiteGraph.java | 319 +++++++++++++++++++
.../flink/graph/bipartite/Projection.java | 76 +++++
.../graph/bipartite/BipartiteEdgeTest.java | 70 ++++
.../graph/bipartite/BipartiteGraphTest.java | 146 +++++++++
.../flink/graph/bipartite/ProjectionTest.java | 76 +++++
.../apache/flink/graph/generator/TestUtils.java | 11 +-
.../apache/flink/test/util/TestBaseUtils.java | 74 ++---
9 files changed, 807 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
index 2bcce29..8e5f916 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
@@ -34,10 +34,10 @@ public class Edge<K, V> extends Tuple3<K, K, V>{
public Edge(){}
- public Edge(K src, K trg, V val) {
- this.f0 = src;
- this.f1 = trg;
- this.f2 = val;
+ public Edge(K source, K target, V value) {
+ this.f0 = source;
+ this.f1 = target;
+ this.f2 = value;
}
/**
@@ -49,8 +49,8 @@ public class Edge<K, V> extends Tuple3<K, K, V>{
return new Edge<>(this.f1, this.f0, this.f2);
}
- public void setSource(K src) {
- this.f0 = src;
+ public void setSource(K source) {
+ this.f0 = source;
}
public K getSource() {
http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteEdge.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteEdge.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteEdge.java
new file mode 100644
index 0000000..167e4ec
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteEdge.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.bipartite;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+
+/**
+ * A BipartiteEdge represents a link between top and bottom vertices
+ * in a {@link BipartiteGraph}. It is generalized form of {@link Edge}
+ * where the source and target vertex IDs can be of different types.
+ *
+ * @param <KT> the key type of the top vertices
+ * @param <KB> the key type of the bottom vertices
+ * @param <EV> the edge value type
+ */
+public class BipartiteEdge<KT, KB, EV> extends Tuple3<KT, KB, EV> {
+
+ private static final long serialVersionUID = 1L;
+
+ public BipartiteEdge() {}
+
+ public BipartiteEdge(KT topId, KB bottomId, EV value) {
+ this.f0 = topId;
+ this.f1 = bottomId;
+ this.f2 = value;
+ }
+
+ public KT getTopId() {
+ return this.f0;
+ }
+
+ public void setTopId(KT topId) {
+ this.f0 = topId;
+ }
+
+ public KB getBottomId() {
+ return this.f1;
+ }
+
+ public void setBottomId(KB bottomId) {
+ this.f1 = bottomId;
+ }
+
+ public EV getValue() {
+ return this.f2;
+ }
+
+ public void setValue(EV value) {
+ this.f2 = value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
new file mode 100644
index 0000000..b325103
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.bipartite;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.Collector;
+
+/**
+ * The vertices of a bipartite graph are divided into two disjoint sets, referenced by the names "top" and "bottom".
+ * Top and bottom vertices with the same key value represent distinct entities and must be specially handled
+ * when projecting to a simple {@link Graph}. Edges can only exist between a pair of vertices from different vertices
+ * sets. E.g. there can be no vertices between a pair of top vertices.
+ *
+ * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
+ * where an edge represents that a particular publication was authored by a particular author.
+ *
+ * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
+ * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
+ * the projection methods.
+ *
+ * @param <KT> the key type of top vertices
+ * @param <KB> the key type of bottom vertices
+ * @param <VVT> the vertex value type of top vertices
+ * @param <VVB> the vertex value type of bottom vertices
+ * @param <EV> the edge value type
+ */
+public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
+
+ private final ExecutionEnvironment context;
+ private final DataSet<Vertex<KT, VVT>> topVertices;
+ private final DataSet<Vertex<KB, VVB>> bottomVertices;
+ private final DataSet<BipartiteEdge<KT, KB, EV>> edges;
+
+ private BipartiteGraph(
+ DataSet<Vertex<KT, VVT>> topVertices,
+ DataSet<Vertex<KB, VVB>> bottomVertices,
+ DataSet<BipartiteEdge<KT, KB, EV>> edges,
+ ExecutionEnvironment context) {
+ this.topVertices = topVertices;
+ this.bottomVertices = bottomVertices;
+ this.edges = edges;
+ this.context = context;
+ }
+
+ /**
+ * Create bipartite graph from datasets.
+ *
+ * @param topVertices dataset of top vertices in the graph
+ * @param bottomVertices dataset of bottom vertices in the graph
+ * @param edges dataset of edges between vertices
+ * @param context Flink execution context
+ * @return new bipartite graph created from provided datasets
+ */
+ public static <KT, KB, VVT, VVB, EV> BipartiteGraph<KT, KB, VVT, VVB, EV> fromDataSet(
+ DataSet<Vertex<KT, VVT>> topVertices,
+ DataSet<Vertex<KB, VVB>> bottomVertices,
+ DataSet<BipartiteEdge<KT, KB, EV>> edges,
+ ExecutionEnvironment context) {
+ return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
+ }
+
+ /**
+ * Get dataset with top vertices.
+ *
+ * @return dataset with top vertices
+ */
+ public DataSet<Vertex<KT, VVT>> getTopVertices() {
+ return topVertices;
+ }
+
+ /**
+ * Get dataset with bottom vertices.
+ *
+ * @return dataset with bottom vertices
+ */
+ public DataSet<Vertex<KB, VVB>> getBottomVertices() {
+ return bottomVertices;
+ }
+
+ /**
+ * Get dataset with graph edges.
+ *
+ * @return dataset with graph edges
+ */
+ public DataSet<BipartiteEdge<KT, KB, EV>> getEdges() {
+ return edges;
+ }
+
+ /**
+ * Convert a bipartite graph into an undirected graph that contains only top vertices. An edge between two vertices
+ * in the new graph will exist only if the original bipartite graph contains a bottom vertex they are both
+ * connected to.
+ *
+ * The simple projection performs a single join and returns edges containing the bipartite edge values.
+ *
+ * Note: KT must override .equals(). This requirement may be removed in a future release.
+ *
+ * @return simple top projection of the bipartite graph
+ */
+ public Graph<KT, VVT, Tuple2<EV, EV>> projectionTopSimple() {
+ DataSet<Edge<KT, Tuple2<EV, EV>>> newEdges = edges.join(edges)
+ .where(1)
+ .equalTo(1)
+ .with(new ProjectionTopSimple<KT, KB, EV>())
+ .name("Simple top projection");
+
+ return Graph.fromDataSet(topVertices, newEdges, context);
+ }
+
+ @ForwardedFieldsFirst("0; 2->2.0")
+ @ForwardedFieldsSecond("0->1; 2->2.1")
+ private static class ProjectionTopSimple<KT, KB, EV>
+ implements FlatJoinFunction<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>, Edge<KT, Tuple2<EV, EV>>> {
+ private Tuple2<EV, EV> edgeValues = new Tuple2<>();
+
+ private Edge<KT, Tuple2<EV, EV>> edge = new Edge<>(null, null, edgeValues);
+
+ @Override
+ public void join(BipartiteEdge<KT, KB, EV> first, BipartiteEdge<KT, KB, EV> second, Collector<Edge<KT, Tuple2<EV, EV>>> out)
+ throws Exception {
+ if (!first.f0.equals(second.f0)) {
+ edge.f0 = first.f0;
+ edge.f1 = second.f0;
+
+ edgeValues.f0 = first.f2;
+ edgeValues.f1 = second.f2;
+
+ out.collect(edge);
+ }
+ }
+ }
+
+ /**
+ * Convert a bipartite graph into an undirected graph that contains only bottom vertices. An edge between two
+ * vertices in the new graph will exist only if the original bipartite graph contains a top vertex they are both
+ * connected to.
+ *
+ * The simple projection performs a single join and returns edges containing the bipartite edge values.
+ *
+ * Note: KB must override .equals(). This requirement may be removed in a future release.
+ *
+ * @return simple bottom projection of the bipartite graph
+ */
+ public Graph<KB, VVB, Tuple2<EV, EV>> projectionBottomSimple() {
+ DataSet<Edge<KB, Tuple2<EV, EV>>> newEdges = edges.join(edges)
+ .where(0)
+ .equalTo(0)
+ .with(new ProjectionBottomSimple<KT, KB, EV>())
+ .name("Simple bottom projection");
+
+ return Graph.fromDataSet(bottomVertices, newEdges, context);
+ }
+
+ @ForwardedFieldsFirst("1->0; 2->2.0")
+ @ForwardedFieldsSecond("1; 2->2.1")
+ private static class ProjectionBottomSimple<KT, KB, EV>
+ implements FlatJoinFunction<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>, Edge<KB, Tuple2<EV, EV>>> {
+ private Tuple2<EV, EV> edgeValues = new Tuple2<>();
+
+ private Edge<KB, Tuple2<EV, EV>> edge = new Edge<>(null, null, edgeValues);
+
+ @Override
+ public void join(BipartiteEdge<KT, KB, EV> first, BipartiteEdge<KT, KB, EV> second, Collector<Edge<KB, Tuple2<EV, EV>>> out)
+ throws Exception {
+ if (!first.f1.equals(second.f1)) {
+ edge.f0 = first.f1;
+ edge.f1 = second.f1;
+
+ edgeValues.f0 = first.f2;
+ edgeValues.f1 = second.f2;
+
+ out.collect(edge);
+ }
+ }
+ }
+
+ /**
+ * Convert a bipartite graph into a graph that contains only top vertices. An edge between two vertices in the new
+ * graph will exist only if the original bipartite graph contains at least one bottom vertex they both connect to.
+ *
+ * The full projection performs three joins and returns edges containing the the connecting vertex ID and value,
+ * both top vertex values, and both bipartite edge values.
+ *
+ * Note: KT must override .equals(). This requirement may be removed in a future release.
+ *
+ * @return full top projection of the bipartite graph
+ */
+ public Graph<KT, VVT, Projection<KB, VVB, VVT, EV>> projectionTopFull() {
+ DataSet<Tuple5<KT, KB, EV, VVT, VVB>> edgesWithVertices = joinEdgeWithVertices();
+
+ DataSet<Edge<KT, Projection<KB, VVB, VVT, EV>>> newEdges = edgesWithVertices.join(edgesWithVertices)
+ .where(1)
+ .equalTo(1)
+ .with(new ProjectionTopFull<KT, KB, EV, VVT, VVB>())
+ .name("Full top projection");
+
+ return Graph.fromDataSet(topVertices, newEdges, context);
+ }
+
+ private DataSet<Tuple5<KT, KB, EV, VVT, VVB>> joinEdgeWithVertices() {
+ return edges
+ .join(topVertices, JoinHint.REPARTITION_HASH_SECOND)
+ .where(0)
+ .equalTo(0)
+ .projectFirst(0, 1, 2)
+ .<Tuple4<KT, KB, EV, VVT>>projectSecond(1)
+ .name("Edge with vertex")
+ .join(bottomVertices, JoinHint.REPARTITION_HASH_SECOND)
+ .where(1)
+ .equalTo(0)
+ .projectFirst(0, 1, 2, 3)
+ .<Tuple5<KT, KB, EV, VVT, VVB>>projectSecond(1)
+ .name("Edge with vertices");
+ }
+
+ @ForwardedFieldsFirst("0; 1->2.0; 2->2.4; 3->2.2; 4->2.1")
+ @ForwardedFieldsSecond("0->1; 2->2.5; 3->2.3")
+ private static class ProjectionTopFull<KT, KB, EV, VVT, VVB>
+ implements FlatJoinFunction<Tuple5<KT, KB, EV, VVT, VVB>, Tuple5<KT, KB, EV, VVT, VVB>, Edge<KT, Projection<KB, VVB, VVT, EV>>> {
+ private Projection<KB, VVB, VVT, EV> projection = new Projection<>();
+
+ private Edge<KT, Projection<KB, VVB, VVT, EV>> edge = new Edge<>(null, null, projection);
+
+ @Override
+ public void join(Tuple5<KT, KB, EV, VVT, VVB> first, Tuple5<KT, KB, EV, VVT, VVB> second, Collector<Edge<KT, Projection<KB, VVB, VVT, EV>>> out)
+ throws Exception {
+ if (!first.f0.equals(second.f0)) {
+ edge.f0 = first.f0;
+ edge.f1 = second.f0;
+
+ projection.f0 = first.f1;
+ projection.f1 = first.f4;
+ projection.f2 = first.f3;
+ projection.f3 = second.f3;
+ projection.f4 = first.f2;
+ projection.f5 = second.f2;
+
+ out.collect(edge);
+ }
+ }
+ }
+
+ /**
+ * Convert a bipartite graph into a graph that contains only bottom vertices. An edge between two vertices in the
+ * new graph will exist only if the original bipartite graph contains at least one top vertex they both connect to.
+ *
+ * The full projection performs three joins and returns edges containing the the connecting vertex ID and value,
+ * both bottom vertex values, and both bipartite edge values.
+ *
+ * Note: KB must override .equals(). This requirement may be removed in a future release.
+ *
+ * @return full bottom projection of the bipartite graph
+ */
+ public Graph<KB, VVB, Projection<KT, VVT, VVB, EV>> projectionBottomFull() {
+ DataSet<Tuple5<KT, KB, EV, VVT, VVB>> edgesWithVertices = joinEdgeWithVertices();
+
+ DataSet<Edge<KB, Projection<KT, VVT, VVB, EV>>> newEdges = edgesWithVertices.join(edgesWithVertices)
+ .where(0)
+ .equalTo(0)
+ .with(new ProjectionBottomFull<KT, KB, EV, VVT, VVB>())
+ .name("Full bottom projection");
+
+ return Graph.fromDataSet(bottomVertices, newEdges, context);
+ }
+
+ @ForwardedFieldsFirst("1->0; 2->2.4; 3->2.1; 4->2.2")
+ @ForwardedFieldsSecond("1; 2->2.5; 4->2.3")
+ private static class ProjectionBottomFull<KT, KB, EV, VVT, VVB>
+ implements FlatJoinFunction<Tuple5<KT, KB, EV, VVT, VVB>, Tuple5<KT, KB, EV, VVT, VVB>, Edge<KB, Projection<KT, VVT, VVB, EV>>> {
+ private Projection<KT, VVT, VVB, EV> projection = new Projection<>();
+
+ private Edge<KB, Projection<KT, VVT, VVB, EV>> edge = new Edge<>(null, null, projection);
+
+ @Override
+ public void join(Tuple5<KT, KB, EV, VVT, VVB> first, Tuple5<KT, KB, EV, VVT, VVB> second, Collector<Edge<KB, Projection<KT, VVT, VVB, EV>>> out)
+ throws Exception {
+ if (!first.f1.equals(second.f1)) {
+ edge.f0 = first.f1;
+ edge.f1 = second.f1;
+
+ projection.f0 = first.f0;
+ projection.f1 = first.f3;
+ projection.f2 = first.f4;
+ projection.f3 = second.f4;
+ projection.f4 = first.f2;
+ projection.f5 = second.f2;
+
+ out.collect(edge);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/Projection.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/Projection.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/Projection.java
new file mode 100644
index 0000000..95a9cf6
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/Projection.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.bipartite;
+
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.graph.Vertex;
+
+/**
+ * The edge value of a full bipartite projection contains:
+ * <ul>
+ * <li>the ID and vertex value of the connecting vertex</li>
+ * <li>the vertex value for the source and target vertex</li>
+ * <li>both edge values from the bipartite edges</li>
+ * </ul>
+ *
+ * @param <KC> the key type of connecting vertices
+ * @param <VVC> the vertex value type of connecting vertices
+ * @param <VV> the vertex value type of top or bottom vertices
+ * @param <EV> the edge value type from bipartite edges
+ */
+public class Projection<KC, VVC, VV, EV> extends Tuple6<KC, VVC, VV, VV, EV, EV> {
+
+ public Projection() {}
+
+ public Projection(
+ Vertex<KC, VVC> connectingVertex,
+ VV sourceVertexValue, VV targetVertexValue,
+ EV sourceEdgeValue, EV targetEdgeValue) {
+ this.f0 = connectingVertex.getId();
+ this.f1 = connectingVertex.getValue();
+ this.f2 = sourceVertexValue;
+ this.f3 = targetVertexValue;
+ this.f4 = sourceEdgeValue;
+ this.f5 = targetEdgeValue;
+ }
+
+ public KC getIntermediateVertexId() {
+ return this.f0;
+ }
+
+ public VVC getIntermediateVertexValue() {
+ return this.f1;
+ }
+
+ public VV getsSourceVertexValue() {
+ return this.f2;
+ }
+
+ public VV getTargetVertexValue() {
+ return this.f3;
+ }
+
+ public EV getSourceEdgeValue() {
+ return this.f4;
+ }
+
+ public EV getTargetEdgeValue() {
+ return this.f5;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteEdgeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteEdgeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteEdgeTest.java
new file mode 100644
index 0000000..ad0106b
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteEdgeTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.bipartite;
+
+import org.apache.flink.graph.bipartite.BipartiteEdge;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class BipartiteEdgeTest {
+
+ private static final int BOTTOM_ID = 0;
+ private static final int TOP_ID = 1;
+ private static final String VALUE = "value";
+
+ private final BipartiteEdge<Integer, Integer, String> edge = createEdge();
+
+ @Test
+ public void testGetBottomId() {
+ assertEquals(BOTTOM_ID, (long) edge.getBottomId());
+ }
+
+ @Test
+ public void testGetTopId() {
+ assertEquals(TOP_ID, (long) edge.getTopId());
+ }
+
+ @Test
+ public void testGetValue() {
+ assertEquals(VALUE, edge.getValue());
+ }
+
+ @Test
+ public void testSetBottomId() {
+ edge.setBottomId(100);
+ assertEquals(100, (long) edge.getBottomId());
+ }
+
+ @Test
+ public void testSetTopId() {
+ edge.setTopId(100);
+ assertEquals(100, (long) edge.getTopId());
+ }
+
+ @Test
+ public void testSetValue() {
+ edge.setValue("newVal");
+ assertEquals("newVal", edge.getValue());
+ }
+
+ private BipartiteEdge<Integer, Integer, String> createEdge() {
+ return new BipartiteEdge<>(TOP_ID, BOTTOM_ID, VALUE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteGraphTest.java
new file mode 100644
index 0000000..366cf8e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteGraphTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.bipartite;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.apache.flink.graph.generator.TestUtils.compareGraph;
+import static org.junit.Assert.assertEquals;
+
+public class BipartiteGraphTest {
+
+ @Test
+ public void testGetTopVertices() throws Exception {
+ BipartiteGraph<Integer, Integer, String, String, String> bipartiteGraph = createBipartiteGraph();
+
+ assertEquals(
+ Arrays.asList(
+ new Vertex<>(4, "top4"),
+ new Vertex<>(5, "top5"),
+ new Vertex<>(6, "top6")),
+ bipartiteGraph.getTopVertices().collect());
+ }
+
+ @Test
+ public void testGetBottomVertices() throws Exception {
+ BipartiteGraph<Integer, Integer, String, String, String> bipartiteGraph = createBipartiteGraph();
+
+ assertEquals(
+ Arrays.asList(
+ new Vertex<>(1, "bottom1"),
+ new Vertex<>(2, "bottom2"),
+ new Vertex<>(3, "bottom3")),
+ bipartiteGraph.getBottomVertices().collect());
+ }
+
+ @Test
+ public void testSimpleTopProjection() throws Exception {
+ BipartiteGraph<Integer, Integer, String, String, String> bipartiteGraph = createBipartiteGraph();
+ Graph<Integer, String, Tuple2<String, String>> graph = bipartiteGraph.projectionTopSimple();
+
+ compareGraph(graph, "4; 5; 6", "5,4; 4,5; 5,6; 6,5");
+
+ String expected =
+ "(5,4,(5-1,4-1))\n" +
+ "(4,5,(4-1,5-1))\n" +
+ "(6,5,(6-2,5-2))\n" +
+ "(5,6,(5-2,6-2))";
+ TestBaseUtils.compareResultAsText(graph.getEdges().collect(), expected);
+ }
+
+ @Test
+ public void testSimpleBottomProjection() throws Exception {
+ BipartiteGraph<Integer, Integer, String, String, String> bipartiteGraph = createBipartiteGraph();
+ Graph<Integer, String, Tuple2<String, String>> graph = bipartiteGraph.projectionBottomSimple();
+
+ compareGraph(graph, "1; 2; 3", "1,2; 2,1; 2,3; 3,2");
+
+ String expected =
+ "(3,2,(6-3,6-2))\n" +
+ "(2,3,(6-2,6-3))\n" +
+ "(2,1,(5-2,5-1))\n" +
+ "(1,2,(5-1,5-2))";
+ TestBaseUtils.compareResultAsText(graph.getEdges().collect(), expected);
+ }
+
+ @Test
+ public void testFullTopProjection() throws Exception {
+ BipartiteGraph<Integer, Integer, String, String, String> bipartiteGraph = createBipartiteGraph();
+ Graph<Integer, String, Projection<Integer, String, String, String>> graph = bipartiteGraph.projectionTopFull();
+
+ graph.getEdges().print();
+ compareGraph(graph, "4; 5; 6", "5,4; 4,5; 5,6; 6,5");
+
+ String expected =
+ "(5,4,(1,bottom1,top5,top4,5-1,4-1))\n" +
+ "(4,5,(1,bottom1,top4,top5,4-1,5-1))\n" +
+ "(6,5,(2,bottom2,top6,top5,6-2,5-2))\n" +
+ "(5,6,(2,bottom2,top5,top6,5-2,6-2))";
+ TestBaseUtils.compareResultAsText(graph.getEdges().collect(), expected);
+ }
+
+ @Test
+ public void testFullBottomProjection() throws Exception {
+ BipartiteGraph<Integer, Integer, String, String, String> bipartiteGraph = createBipartiteGraph();
+ Graph<Integer, String, Projection<Integer, String, String, String>> graph = bipartiteGraph.projectionBottomFull();
+
+ compareGraph(graph, "1; 2; 3", "1,2; 2,1; 2,3; 3,2");
+
+ String expected =
+ "(3,2,(6,top6,bottom3,bottom2,6-3,6-2))\n" +
+ "(2,3,(6,top6,bottom2,bottom3,6-2,6-3))\n" +
+ "(2,1,(5,top5,bottom2,bottom1,5-2,5-1))\n" +
+ "(1,2,(5,top5,bottom1,bottom2,5-1,5-2))";
+ TestBaseUtils.compareResultAsText(graph.getEdges().collect(), expected);
+ }
+
+ private BipartiteGraph<Integer, Integer, String, String, String> createBipartiteGraph() {
+ ExecutionEnvironment executionEnvironment = ExecutionEnvironment.createCollectionsEnvironment();
+
+ DataSet<Vertex<Integer, String>> topVertices = executionEnvironment.fromCollection(Arrays.asList(
+ new Vertex<>(4, "top4"),
+ new Vertex<>(5, "top5"),
+ new Vertex<>(6, "top6")
+ ));
+
+ DataSet<Vertex<Integer, String>> bottomVertices = executionEnvironment.fromCollection(Arrays.asList(
+ new Vertex<>(1, "bottom1"),
+ new Vertex<>(2, "bottom2"),
+ new Vertex<>(3, "bottom3")
+ ));
+
+ DataSet<BipartiteEdge<Integer, Integer, String>> edges = executionEnvironment.fromCollection(Arrays.asList(
+ new BipartiteEdge<>(4, 1, "4-1"),
+ new BipartiteEdge<>(5, 1, "5-1"),
+ new BipartiteEdge<>(5, 2, "5-2"),
+ new BipartiteEdge<>(6, 2, "6-2"),
+ new BipartiteEdge<>(6, 3, "6-3")
+ ));
+
+ return BipartiteGraph.fromDataSet(topVertices, bottomVertices, edges, executionEnvironment);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/ProjectionTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/ProjectionTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/ProjectionTest.java
new file mode 100644
index 0000000..3aafe64
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/ProjectionTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.bipartite;
+
+import org.apache.flink.graph.Vertex;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ProjectionTest {
+
+ private static final int ID = 10;
+
+ private static final String VERTEX_VALUE = "vertex-value";
+ private static final String SOURCE_EDGE_VALUE = "source-edge-value";
+ private static final String TARGET_EDGE_VALUE = "target-edge-value";
+ private static final String SOURCE_VERTEX_VALUE = "source-vertex-value";
+ private static final String TARGET_VERTEX_VALUE = "target-vertex-value";
+
+ private Projection<Integer, String, String, String> projection = createProjection();
+
+ @Test
+ public void testIntermediateVertexGetId() {
+ assertEquals(Integer.valueOf(ID), projection.getIntermediateVertexId());
+ }
+
+ @Test
+ public void testGetIntermediateVertexValue() {
+ assertEquals(VERTEX_VALUE, projection.getIntermediateVertexValue());
+ }
+
+ @Test
+ public void testGetSourceEdgeValue() {
+ assertEquals(SOURCE_EDGE_VALUE, projection.getSourceEdgeValue());
+ }
+
+ @Test
+ public void testGetTargetEdgeValue() {
+ assertEquals(TARGET_EDGE_VALUE, projection.getTargetEdgeValue());
+ }
+
+ @Test
+ public void testGetSourceVertexValue() {
+ assertEquals(SOURCE_VERTEX_VALUE, projection.getsSourceVertexValue());
+ }
+
+ @Test
+ public void testGetTargetVertexValue() {
+ assertEquals(TARGET_VERTEX_VALUE, projection.getTargetVertexValue());
+ }
+
+ private Projection<Integer, String, String, String> createProjection() {
+ return new Projection<>(
+ new Vertex<>(ID, VERTEX_VALUE),
+ SOURCE_VERTEX_VALUE,
+ TARGET_VERTEX_VALUE,
+ SOURCE_EDGE_VALUE,
+ TARGET_EDGE_VALUE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
index 3ea5a44..a302a30 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
@@ -50,7 +50,12 @@ public final class TestUtils {
*/
public static <K,VV,EV> void compareGraph(Graph<K,VV,EV> graph, String expectedVertices, String expectedEdges)
throws Exception {
- // Vertices
+ compareVertices(graph, expectedVertices);
+ compareEdges(graph, expectedEdges);
+ }
+
+ private static <K, VV, EV> void compareVertices(Graph<K, VV, EV> graph, String expectedVertices)
+ throws Exception {
if (expectedVertices != null) {
List<String> resultVertices = new ArrayList<>();
@@ -60,8 +65,10 @@ public final class TestUtils {
TestBaseUtils.compareResultAsText(resultVertices, expectedVertices.replaceAll("\\s","").replace(";", "\n"));
}
+ }
- // Edges
+ private static <K, VV, EV> void compareEdges(Graph<K, VV, EV> graph, String expectedEdges)
+ throws Exception {
if (expectedEdges != null) {
List<String> resultEdges = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 804b3d4..5e15076 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -18,17 +18,12 @@
package org.apache.flink.test.util;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
import akka.actor.ActorRef;
import akka.dispatch.Futures;
import akka.pattern.Patterns;
import akka.util.Timeout;
-
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
-
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -37,14 +32,10 @@ import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
-
import org.apache.hadoop.fs.FileSystem;
-
import org.junit.Assert;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
@@ -77,6 +68,9 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
public class TestBaseUtils extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(TestBaseUtils.class);
@@ -92,32 +86,32 @@ public class TestBaseUtils extends TestLogger {
public static FiniteDuration DEFAULT_TIMEOUT = new FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
// ------------------------------------------------------------------------
-
+
protected static File logDir;
protected TestBaseUtils(){
verifyJvmOptions();
}
-
+
private static void verifyJvmOptions() {
long heap = Runtime.getRuntime().maxMemory() >> 20;
Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
}
-
-
+
+
public static LocalFlinkMiniCluster startCluster(
int numTaskManagers,
int taskManagerNumSlots,
boolean startWebserver,
boolean startZooKeeper,
boolean singleActorSystem) throws Exception {
-
+
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
-
+
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, startWebserver);
if (startZooKeeper) {
@@ -146,7 +140,7 @@ public class TestBaseUtils extends TestLogger {
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
-
+
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString());
LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, singleActorSystem);
@@ -164,7 +158,7 @@ public class TestBaseUtils extends TestLogger {
if (executor != null) {
int numUnreleasedBCVars = 0;
int numActiveConnections = 0;
-
+
if (executor.running()) {
List<ActorRef> tms = executor.getTaskManagersAsJava();
List<Future<Object>> bcVariableManagerResponseFutures = new ArrayList<>();
@@ -249,7 +243,7 @@ public class TestBaseUtils extends TestLogger {
}
return readers;
}
-
+
public static BufferedInputStream[] getResultInputStream(String resultPath) throws IOException {
return getResultInputStream(resultPath, new String[]{});
}
@@ -268,13 +262,13 @@ public class TestBaseUtils extends TestLogger {
readAllResultLines(target, resultPath, new String[]{});
}
- public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes)
+ public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes)
throws IOException {
-
+
readAllResultLines(target, resultPath, excludePrefixes, false);
}
- public static void readAllResultLines(List<String> target, String resultPath,
+ public static void readAllResultLines(List<String> target, String resultPath,
String[] excludePrefixes, boolean inOrderOfFiles) throws IOException {
final BufferedReader[] readers = getResultReader(resultPath, excludePrefixes, inOrderOfFiles);
@@ -453,14 +447,14 @@ public class TestBaseUtils extends TestLogger {
public static <T> void compareOrderedResultAsText(List<T> result, String expected, boolean asTuples) {
compareResult(result, expected, asTuples, false);
}
-
+
private static <T> void compareResult(List<T> result, String expected, boolean asTuples, boolean sort) {
String[] expectedStrings = expected.split("\n");
String[] resultStrings = new String[result.size()];
-
+
for (int i = 0; i < resultStrings.length; i++) {
T val = result.get(i);
-
+
if (asTuples) {
if (val instanceof Tuple) {
Tuple t = (Tuple) val;
@@ -480,19 +474,25 @@ public class TestBaseUtils extends TestLogger {
resultStrings[i] = (val == null) ? "null" : val.toString();
}
}
-
- assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length);
if (sort) {
Arrays.sort(expectedStrings);
Arrays.sort(resultStrings);
}
-
+
+ // Include content of both arrays to provide more context in case of a test failure
+ String msg = String.format(
+ "Different elements in arrays: expected %d elements and received %d\n expected: %s\n received: %s",
+ expectedStrings.length, resultStrings.length,
+ Arrays.toString(expectedStrings), Arrays.toString(resultStrings));
+
+ assertEquals(msg, expectedStrings.length, resultStrings.length);
+
for (int i = 0; i < expectedStrings.length; i++) {
- assertEquals(expectedStrings[i], resultStrings[i]);
+ assertEquals(msg, expectedStrings[i], resultStrings[i]);
}
}
-
+
// --------------------------------------------------------------------------------------------
// Comparison methods for tests using sample
// --------------------------------------------------------------------------------------------
@@ -523,7 +523,7 @@ public class TestBaseUtils extends TestLogger {
// --------------------------------------------------------------------------------------------
// Miscellaneous helper methods
// --------------------------------------------------------------------------------------------
-
+
protected static Collection<Object[]> toParameterList(Configuration ... testConfigs) {
ArrayList<Object[]> configs = new ArrayList<>();
for (Configuration testConfig : testConfigs) {
@@ -560,7 +560,7 @@ public class TestBaseUtils extends TestLogger {
System.err.println("Failed to delete file " + f.getAbsolutePath());
}
}
-
+
public static String constructTestPath(Class<?> forClass, String folder) {
// we create test path that depends on class to prevent name clashes when two tests
// create temp files with the same name
@@ -571,7 +571,7 @@ public class TestBaseUtils extends TestLogger {
path += (forClass.getName() + "-" + folder);
return path;
}
-
+
public static String constructTestURI(Class<?> forClass, String folder) {
return new File(constructTestPath(forClass, folder)).toURI().toString();
}
@@ -597,7 +597,7 @@ public class TestBaseUtils extends TestLogger {
return IOUtils.toString(is, connection.getContentEncoding() != null ? connection.getContentEncoding() : "UTF-8");
}
-
+
public static class TupleComparator<T extends Tuple> implements Comparator<T> {
@Override
@@ -612,7 +612,7 @@ public class TestBaseUtils extends TestLogger {
for (int i = 0; i < o1.getArity(); i++) {
Object val1 = o1.getField(i);
Object val2 = o2.getField(i);
-
+
int cmp;
if (val1 != null && val2 != null) {
cmp = compareValues(val1, val2);
@@ -620,16 +620,16 @@ public class TestBaseUtils extends TestLogger {
else {
cmp = val1 == null ? (val2 == null ? 0 : -1) : 1;
}
-
+
if (cmp != 0) {
return cmp;
}
}
-
+
return 0;
}
}
-
+
@SuppressWarnings("unchecked")
private static <X extends Comparable<X>> int compareValues(Object o1, Object o2) {
if (o1 instanceof Comparable && o2 instanceof Comparable) {