You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mushketyk <gi...@git.apache.org> on 2016/09/28 21:59:53 UTC

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

GitHub user mushketyk opened a pull request:

    https://github.com/apache/flink/pull/2564

    [FLINK-2254] Add BipartiateGraph class

    This PR implements BipartiteGraph class with support classes and lays foundation for future work on bipartite graphs support. I didn't add documentation because I would like to make sure that this approach is in line with what community has in mind regarding bipartite graph support. If this PR is good, I'll continue with documentation and other related tasks.
    
    - [x] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [x] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [x] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mushketyk/flink bipartite-graph

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2564.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2564
    
----
commit 602ba8532a6988c0b5bc36c51ea3458c44351874
Author: Ivan Mushketyk <iv...@gmail.com>
Date:   2016-09-27T22:14:09Z

    [FLINK-2254] Add BipartiateGraph class

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    Thank you both for your work @mushketyk and @greghogan!
    Please, keep in mind that we should always add documentation for every new feature; especially a big one such as supporting a new graph type. We've added the checklist template for each new PR so that we don't forget about it :)
    Can you please open a JIRA to track that docs for bipartite graphs are missing? Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    Try switching to `ExecutionEnvironment.createCollectionsEnvironment()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    Agreed, I would amend my earlier suggestion to say we only need to start with two projection methods (for each of top and bottom), something like
      `public Graph<TK, TVV, Tuple2<EV, EV>> topProjectionSimple() {`
    and
      `public Graph<TK, TVV, TopProjection<TVV, BK, BVV, EV>> topProjection() {`
    
    `TopProjection` (we can find better names) would be a `Tuple6` with POJO accessors as with `Vertex`, `Edge`, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    Providing a flattened tuple is certainly better than having the user implement the reduce, but I think we should provide separate methods for the default and custom operations. A projection is a very well-defined operation: create a graph where there is an edge between any pair of vertices with a common neighbor in the bipartite graph. If the user wants to apply mappers or other transformations on the vertices and edges, they can do so afterwards, using the graph methods. The problem is that with a projections, some information is lost, e.g. the edge values. For these cases, we can provide a custom projection method where we give the labels in a flattened tuple as @greghogan suggested, but I'm afraid the API will look ugly with a Tuple8 there. Another, maybe more friendly solution, would be attaching the labels on the projection graph edges. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90926409
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,364 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#projectionTopSimple()} or
    + * {@link BipartiteGraph#projectionBottomFull()} methods.
    + *
    + * @param <KT> the key type of the top vertices
    + * @param <KB> the key type of the bottom vertices
    + * @param <VVT> the top vertices value type
    + * @param <VVB> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<KT, VVT>> topVertices;
    +	private final DataSet<Vertex<KB, VVB>> bottomVertices;
    +	private final DataSet<BipartiteEdge<KT, KB, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<KT, VVT>> topVertices,
    +			DataSet<Vertex<KB, VVB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    --- End diff --
    
    Extra space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81218490
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,231 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist
    + * between a pair of vertices from different vertices sets. E.g. there can be no vertices between a pair
    + * of top vertices.
    + *
    + * <p>Bipartite graph is useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or
    + * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods.
    + *
    + * @param <KT> the key type of the top vertices
    + * @param <KB> the key type of the bottom vertices
    + * @param <VT> the top vertices value type
    + * @param <VB> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<KT, KB, VT, VB, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<KT, VT>> topVertices;
    +	private final DataSet<Vertex<KB, VB>> bottomVertices;
    +	private final DataSet<BipartiteEdge<KT, KB, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<KT, VT>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<KB, VB>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<KT, KB, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they both connected to.
    +	 *
    +	 * <p>Caller should provide a function that will create an edge between two top vertices. This function will receive
    --- End diff --
    
    I'm not sure whether this is a good idea. Why leave this to the user?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81245767
  
    --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---
    @@ -480,7 +480,8 @@ protected static File asFile(String path) {
     			}
     		}
     		
    -		assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length);
    +		assertEquals(String.format("Wrong number of elements result. Expected: %s. Result: %s.", Arrays.toString(expectedStrings), Arrays.toString(resultStrings)),
    --- End diff --
    
    The issue here is that it compares lengths of objects and therefore JUnit only prints compared numbers (say 2 and 0) and not content of arrays.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    Hi @greghogan,
    
    I like your ideas about providing different API for projections. This should be better than my approach. 
    @vasia What do you think about this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90922934
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteEdge.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.java.tuple.Tuple3;
    +
    +/**
    + *
    + * A BipartiteEdge represents a link between a top and bottom vertices
    + * in a {@link BipartiteGraph}. It is similar to the {@link Edge} class
    --- End diff --
    
    "It is generalized form of {@link Edge} where the source and target vertex IDs can be of different types.", or similar?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r91239047
  
    --- Diff: flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/ProjectionTest.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +public class ProjectionTest {
    --- End diff --
    
    I know that testing getters/setters is considered wasting of CPU cycles, but since getters/setters are not auto-generated here and need to access proper tuple's values I decided to add them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90923609
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,364 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    --- End diff --
    
    Empty line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    Yes, you are right, `bipartite`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    @vasia I don't think anybody is shepherding this PR :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81781081
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#simpleTopProjection()} or
    + * {@link BipartiteGraph#fullBottomProjection()} methods.
    + *
    + * @param <TK> the key type of the top vertices
    + * @param <BK> the key type of the bottom vertices
    + * @param <TV> the top vertices value type
    + * @param <BV> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<TK, BK, TV, BV, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<TK, TV>> topVertices;
    +	private final DataSet<Vertex<BK, BV>> bottomVertices;
    +	private final DataSet<BipartiteEdge<TK, BK, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<TK, TV>> topVertices,
    +			DataSet<Vertex<BK, BV>> bottomVertices,
    +			DataSet<BipartiteEdge<TK, BK, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<TK, TV>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<BK, BV>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<TK, BK, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they are both connected to.
    +	 *
    +	 * @return top projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect top vertices in the original graph
    +	 */
    +	public Graph<TK, TV, Tuple2<EV, EV>> simpleTopProjection() {
    --- End diff --
    
    Would it be preferable for IDE command completion to call this method `projectTopSimple` (and then have `projectTopFull` / `projectBottomSimple` / `projectBottomFull`)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90926642
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,364 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#projectionTopSimple()} or
    + * {@link BipartiteGraph#projectionBottomFull()} methods.
    + *
    + * @param <KT> the key type of the top vertices
    + * @param <KB> the key type of the bottom vertices
    + * @param <VVT> the top vertices value type
    + * @param <VVB> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<KT, VVT>> topVertices;
    +	private final DataSet<Vertex<KB, VVB>> bottomVertices;
    +	private final DataSet<BipartiteEdge<KT, KB, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<KT, VVT>> topVertices,
    +			DataSet<Vertex<KB, VVB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<KT, VVT>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<KB, VVB>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<KT, KB, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    --- End diff --
    
    "Convert a bipartite graph". Also, should state that an undirected graph is returned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81218403
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,231 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist
    + * between a pair of vertices from different vertices sets. E.g. there can be no vertices between a pair
    + * of top vertices.
    + *
    + * <p>Bipartite graph is useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or
    + * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods.
    + *
    + * @param <KT> the key type of the top vertices
    + * @param <KB> the key type of the bottom vertices
    + * @param <VT> the top vertices value type
    + * @param <VB> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<KT, KB, VT, VB, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<KT, VT>> topVertices;
    +	private final DataSet<Vertex<KB, VB>> bottomVertices;
    +	private final DataSet<BipartiteEdge<KT, KB, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<KT, VT>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<KB, VB>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<KT, KB, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they both connected to.
    --- End diff --
    
    they *are both connected to


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81776392
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#simpleTopProjection()} or
    + * {@link BipartiteGraph#fullBottomProjection()} methods.
    + *
    + * @param <TK> the key type of the top vertices
    + * @param <BK> the key type of the bottom vertices
    + * @param <TV> the top vertices value type
    + * @param <BV> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<TK, BK, TV, BV, EV> {
    --- End diff --
    
    Let's check with @vasia first. She may prefer the current type parameter names or have a better suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81218538
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java ---
    @@ -34,10 +34,10 @@
     
     	public Edge(){}
     
    -	public Edge(K src, K trg, V val) {
    -		this.f0 = src;
    -		this.f1 = trg;
    -		this.f2 = val;
    +	public Edge(K source, K target, V value) {
    --- End diff --
    
    Why did you change these?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r86694124
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#simpleTopProjection()} or
    + * {@link BipartiteGraph#fullBottomProjection()} methods.
    + *
    + * @param <TK> the key type of the top vertices
    + * @param <BK> the key type of the bottom vertices
    + * @param <TV> the top vertices value type
    + * @param <BV> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<TK, BK, TV, BV, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<TK, TV>> topVertices;
    +	private final DataSet<Vertex<BK, BV>> bottomVertices;
    +	private final DataSet<BipartiteEdge<TK, BK, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<TK, TV>> topVertices,
    +			DataSet<Vertex<BK, BV>> bottomVertices,
    +			DataSet<BipartiteEdge<TK, BK, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<TK, TV>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<BK, BV>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<TK, BK, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they are both connected to.
    +	 *
    +	 * @return top projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect top vertices in the original graph
    +	 */
    +	public Graph<TK, TV, Tuple2<EV, EV>> simpleTopProjection() {
    +
    +		DataSet<Edge<TK, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(1)
    +			.equalTo(1)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>>>() {
    +				@Override
    +				public boolean filter(Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>> value) throws Exception {
    +					BipartiteEdge<TK, BK, EV> edge1 = value.f0;
    +					BipartiteEdge<TK, BK, EV> edge2 = value.f1;
    +					return !edge1.getTopId().equals(edge2.getTopId());
    +				}
    +			})
    +			.map(new MapFunction<Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>>, Edge<TK, Tuple2<EV, EV>>>() {
    +				@Override
    +				public Edge<TK, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>> value) throws Exception {
    +					return new Edge<>(
    --- End diff --
    
    Thank you. Got it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90937740
  
    --- Diff: flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/BipartiteEdgeTest.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +public class BipartiteEdgeTest {
    +
    +	private static final int BOTTOM_ID = 0;
    +	private static final int TOP_ID = 1;
    +	private static final String VALUE = "value";
    +
    +	private final BipartiteEdge<Integer, Integer, String> edge = createEdge();
    +
    +	@Test
    +	public void testGetBottomId() {
    +		assertEquals(Integer.valueOf(BOTTOM_ID), edge.getBottomId());
    +	}
    +
    +	@Test
    +	public void testGetTopId() {
    +		assertEquals(Integer.valueOf(TOP_ID), edge.getTopId());
    +	}
    +
    +	@Test
    +	public void testGetValue() {
    +		assertEquals(VALUE, edge.getValue());
    +	}
    +
    +	@Test
    +	public void testSetBottomId() {
    +		edge.setBottomId(100);
    +		assertEquals(Integer.valueOf(100), edge.getBottomId());
    --- End diff --
    
    Does auto-boxing not work here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    Hi @greghogan,
    
    I've updated the PR according to your review and rebased it on top of the `master` branch.
    
    The only thing that I didn't change is the message in the `assertEquals` you pointed to since it is not very helpful to receive an error message like: "Wrong number of elements result. Expected 4, actual 3." I think it is much more helpful for the debugging purposes to see contents of the arrays to figure out why their lengths are different.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r91618485
  
    --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---
    @@ -480,16 +480,22 @@ protected static File asFile(String path) {
     				resultStrings[i] = (val == null) ? "null" : val.toString();
     			}
     		}
    -		
    -		assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length);
    +
    +		//
    +		String msg = String.format(
    +			"Different elements in arrays. Expected %d elements: %s. Actual %d elements: %s",
    +			expectedStrings.length, Arrays.toString(expectedStrings),
    +			resultStrings.length, Arrays.toString(resultStrings));
    +
    +		assertEquals(msg, expectedStrings.length, resultStrings.length);
     
     		if (sort) {
     			Arrays.sort(expectedStrings);
     			Arrays.sort(resultStrings);
     		}
     		
     		for (int i = 0; i < expectedStrings.length; i++) {
    -			assertEquals(expectedStrings[i], resultStrings[i]);
    +			assertEquals(msg, expectedStrings[i], resultStrings[i]);
    --- End diff --
    
    I think it will give more context just as in the comparing lengths case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90938279
  
    --- Diff: flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/ProjectionTest.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +public class ProjectionTest {
    --- End diff --
    
    Is this test class necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    Hi @vasia , thank you for merging my PR.
    Thank you for the reminder about the documentation. I've created the JIRA for it: https://issues.apache.org/jira/browse/FLINK-5311



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90921785
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteEdge.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.java.tuple.Tuple3;
    +
    +/**
    + *
    + * A BipartiteEdge represents a link between a top and bottom vertices
    --- End diff --
    
    "between a top" -> "between top", or similar.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    @mushketyk @vasia, thoughts on package naming? Should we create a new `org.apache.flink.bigraph` package? Another option would be `org.apache.flink.graph.bidirectional` which would suggest future package names like `org.apache.flink.graph.multi` and `org.apache.flink.graph.temporal`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    Hi @greghogan , thank you for your review.
    I'll try to fix them in the next couple of days.
    
    Best regards,
    Ivan.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90937555
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Projection.java ---
    @@ -0,0 +1,68 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.java.tuple.Tuple6;
    +
    +/**
    + * Result of projection of a connection between two vertices in a bipartite graph.
    + *
    + * @param <VK> the key type of vertices of an opposite set
    + * @param <VV> the value type of vertices of an opposite set
    + * @param <EV> the edge value type
    + */
    +public class Projection<VK, VV, EV, VVC> extends Tuple6<VK, VV, EV, EV, VVC, VVC> {
    --- End diff --
    
    Missing comment for documenting `VVC`. Should `EV` be placed before `VVC`? And before `VK` and `VV`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90926220
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,364 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    --- End diff --
    
    What about "The vertices of a bipartite graph are divided into two disjoint sets, referenced by the names "top" and "bottom". Top and bottom vertices with the same key value represent distinct entities and must be specially handled when projecting to a simple {@link Graph}."?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    @vasia @greghogan I've updated the PR. Could you please give it another look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81816707
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#simpleTopProjection()} or
    + * {@link BipartiteGraph#fullBottomProjection()} methods.
    + *
    + * @param <TK> the key type of the top vertices
    + * @param <BK> the key type of the bottom vertices
    + * @param <TV> the top vertices value type
    + * @param <BV> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<TK, BK, TV, BV, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<TK, TV>> topVertices;
    +	private final DataSet<Vertex<BK, BV>> bottomVertices;
    +	private final DataSet<BipartiteEdge<TK, BK, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<TK, TV>> topVertices,
    +			DataSet<Vertex<BK, BV>> bottomVertices,
    +			DataSet<BipartiteEdge<TK, BK, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<TK, TV>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<BK, BV>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<TK, BK, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they are both connected to.
    +	 *
    +	 * @return top projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect top vertices in the original graph
    +	 */
    +	public Graph<TK, TV, Tuple2<EV, EV>> simpleTopProjection() {
    +
    +		DataSet<Edge<TK, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(1)
    +			.equalTo(1)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>>>() {
    +				@Override
    +				public boolean filter(Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>> value) throws Exception {
    +					BipartiteEdge<TK, BK, EV> edge1 = value.f0;
    +					BipartiteEdge<TK, BK, EV> edge2 = value.f1;
    +					return !edge1.getTopId().equals(edge2.getTopId());
    +				}
    +			})
    +			.map(new MapFunction<Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>>, Edge<TK, Tuple2<EV, EV>>>() {
    +				@Override
    +				public Edge<TK, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>> value) throws Exception {
    +					return new Edge<>(
    --- End diff --
    
    We don't need to create new objects for each call to `map`. The `Edge` and `Tuple2` can be fields on the class. For examples look in `DegreeAnnotationFunctions.java`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    @vasia , @greghogan I've created a new package, moved new classes there and update PR according to your latest comments.
    
    Best regards,
    Ivan.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90924411
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,364 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#projectionTopSimple()} or
    --- End diff --
    
    Are there not four methods? Can we simply reference these, i.e. "This can be achieved using the projection methods."?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90929479
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,364 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#projectionTopSimple()} or
    + * {@link BipartiteGraph#projectionBottomFull()} methods.
    + *
    + * @param <KT> the key type of the top vertices
    + * @param <KB> the key type of the bottom vertices
    + * @param <VVT> the top vertices value type
    + * @param <VVB> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<KT, VVT>> topVertices;
    +	private final DataSet<Vertex<KB, VVB>> bottomVertices;
    +	private final DataSet<BipartiteEdge<KT, KB, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<KT, VVT>> topVertices,
    +			DataSet<Vertex<KB, VVB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<KT, VVT>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<KB, VVB>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<KT, KB, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they are both connected to.
    +	 *
    +	 * @return top projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect top vertices in the original graph
    +	 */
    +	public Graph<KT, VVT, Tuple2<EV, EV>> projectionTopSimple() {
    +
    +		DataSet<Edge<KT, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(1)
    +			.equalTo(1)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>>() {
    +				@Override
    +				public boolean filter(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					BipartiteEdge<KT, KB, EV> edge1 = value.f0;
    +					BipartiteEdge<KT, KB, EV> edge2 = value.f1;
    +					return !edge1.getTopId().equals(edge2.getTopId());
    +				}
    +			})
    +			.map(new MapFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>, Edge<KT, Tuple2<EV, EV>>>() {
    +				@Override
    +				public Edge<KT, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					return new Edge<>(
    --- End diff --
    
    The Edge and Tuple2 objects can be reused.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90929487
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,364 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#projectionTopSimple()} or
    + * {@link BipartiteGraph#projectionBottomFull()} methods.
    + *
    + * @param <KT> the key type of the top vertices
    + * @param <KB> the key type of the bottom vertices
    + * @param <VVT> the top vertices value type
    + * @param <VVB> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<KT, VVT>> topVertices;
    +	private final DataSet<Vertex<KB, VVB>> bottomVertices;
    +	private final DataSet<BipartiteEdge<KT, KB, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<KT, VVT>> topVertices,
    +			DataSet<Vertex<KB, VVB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<KT, VVT>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<KB, VVB>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<KT, KB, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they are both connected to.
    +	 *
    +	 * @return top projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect top vertices in the original graph
    +	 */
    +	public Graph<KT, VVT, Tuple2<EV, EV>> projectionTopSimple() {
    +
    +		DataSet<Edge<KT, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(1)
    +			.equalTo(1)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>>() {
    --- End diff --
    
    Can combine the filter and map.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81787458
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#simpleTopProjection()} or
    + * {@link BipartiteGraph#fullBottomProjection()} methods.
    + *
    + * @param <TK> the key type of the top vertices
    + * @param <BK> the key type of the bottom vertices
    + * @param <TV> the top vertices value type
    + * @param <BV> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<TK, BK, TV, BV, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<TK, TV>> topVertices;
    +	private final DataSet<Vertex<BK, BV>> bottomVertices;
    +	private final DataSet<BipartiteEdge<TK, BK, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<TK, TV>> topVertices,
    +			DataSet<Vertex<BK, BV>> bottomVertices,
    +			DataSet<BipartiteEdge<TK, BK, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<TK, TV>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<BK, BV>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<TK, BK, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they are both connected to.
    +	 *
    +	 * @return top projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect top vertices in the original graph
    +	 */
    +	public Graph<TK, TV, Tuple2<EV, EV>> simpleTopProjection() {
    --- End diff --
    
    Good point. Will update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81780392
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Projection.java ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.java.tuple.Tuple4;
    +
    +/**
    + * Result of projection of a connection between two vertices in a bipartite graph.
    + *
    + * @param <VK> the key type of vertices of an opposite set
    + * @param <VV> the value type of vertices of an opposite set
    + * @param <EV> the edge value type
    + */
    +public class Projection<VK, VV, EV> extends Tuple4<VK, VV, EV, EV> {
    +	public Projection() {}
    +
    +	public Projection(Vertex<VK, VV> vertex, EV edgeValue1, EV edgeValue2) {
    +		this.f0 = vertex.getId();
    +		this.f1 = vertex.getValue();
    +		this.f2 = edgeValue1;
    +		this.f3 = edgeValue2;
    +	}
    +
    +	public VK getVertexId() {
    +		return this.f0;
    +	}
    +
    +	public VV getVertexValue() {
    +		return this.f1;
    +	}
    +
    +	public EV getEdgeValue1() {
    --- End diff --
    
    Can we now call this the "source" value (and `Value2` the "target" value??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r91613888
  
    --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---
    @@ -480,16 +480,22 @@ protected static File asFile(String path) {
     				resultStrings[i] = (val == null) ? "null" : val.toString();
     			}
     		}
    -		
    -		assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length);
    +
    +		//
    +		String msg = String.format(
    +			"Different elements in arrays. Expected %d elements: %s. Actual %d elements: %s",
    +			expectedStrings.length, Arrays.toString(expectedStrings),
    +			resultStrings.length, Arrays.toString(resultStrings));
    +
    +		assertEquals(msg, expectedStrings.length, resultStrings.length);
     
     		if (sort) {
     			Arrays.sort(expectedStrings);
     			Arrays.sort(resultStrings);
     		}
     		
     		for (int i = 0; i < expectedStrings.length; i++) {
    -			assertEquals(expectedStrings[i], resultStrings[i]);
    +			assertEquals(msg, expectedStrings[i], resultStrings[i]);
    --- End diff --
    
    Do we need to include `msg` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90938901
  
    --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---
    @@ -480,7 +480,8 @@ protected static File asFile(String path) {
     			}
     		}
     		
    -		assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length);
    +		assertEquals(String.format("Wrong number of elements result. Expected: %s. Result: %s.", Arrays.toString(expectedStrings), Arrays.toString(resultStrings)),
    --- End diff --
    
    The array contents are compared in the assertions that follow the test for length.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81781488
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#simpleTopProjection()} or
    + * {@link BipartiteGraph#fullBottomProjection()} methods.
    + *
    + * @param <TK> the key type of the top vertices
    + * @param <BK> the key type of the bottom vertices
    + * @param <TV> the top vertices value type
    + * @param <BV> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<TK, BK, TV, BV, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<TK, TV>> topVertices;
    +	private final DataSet<Vertex<BK, BV>> bottomVertices;
    +	private final DataSet<BipartiteEdge<TK, BK, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<TK, TV>> topVertices,
    +			DataSet<Vertex<BK, BV>> bottomVertices,
    +			DataSet<BipartiteEdge<TK, BK, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<TK, TV>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<BK, BV>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<TK, BK, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they are both connected to.
    +	 *
    +	 * @return top projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect top vertices in the original graph
    +	 */
    +	public Graph<TK, TV, Tuple2<EV, EV>> simpleTopProjection() {
    --- End diff --
    
    Also, and it was discussed only have a Tuple2 of edge values, but double checking that we don't also want to include the (here: bottom) vertex ID in the new edge value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    Thanks for the reminder @vasia. The separate JIRA sub-task does allow for a discussion of how best to document the full set of proposed bipartite functionality.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81777101
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteEdge.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.java.tuple.Tuple3;
    +
    +/**
    + *
    + * A BipartiteEdge represents a link between a top and bottom vertices
    + * in a {@link BipartiteGraph}. It is similar to the {@link Edge} class
    + * with the only difference that key of connected vertices can have
    + * different types.
    + *
    + * @param <TK> the key type of the top vertices
    + * @param <BK> the key type of the bottom vertices
    + * @param <EV> the edge value type
    + */
    +public class BipartiteEdge<TK, BK, EV> extends Tuple3<TK, BK, EV> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public BipartiteEdge(){}
    --- End diff --
    
    Extra space for `() {}`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90930301
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,364 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#projectionTopSimple()} or
    + * {@link BipartiteGraph#projectionBottomFull()} methods.
    + *
    + * @param <KT> the key type of the top vertices
    + * @param <KB> the key type of the bottom vertices
    + * @param <VVT> the top vertices value type
    + * @param <VVB> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<KT, VVT>> topVertices;
    +	private final DataSet<Vertex<KB, VVB>> bottomVertices;
    +	private final DataSet<BipartiteEdge<KT, KB, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<KT, VVT>> topVertices,
    +			DataSet<Vertex<KB, VVB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<KT, VVT>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<KB, VVB>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<KT, KB, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they are both connected to.
    +	 *
    +	 * @return top projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect top vertices in the original graph
    +	 */
    +	public Graph<KT, VVT, Tuple2<EV, EV>> projectionTopSimple() {
    +
    +		DataSet<Edge<KT, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(1)
    +			.equalTo(1)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>>() {
    +				@Override
    +				public boolean filter(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					BipartiteEdge<KT, KB, EV> edge1 = value.f0;
    +					BipartiteEdge<KT, KB, EV> edge2 = value.f1;
    +					return !edge1.getTopId().equals(edge2.getTopId());
    +				}
    +			})
    +			.map(new MapFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>, Edge<KT, Tuple2<EV, EV>>>() {
    +				@Override
    +				public Edge<KT, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					return new Edge<>(
    +						value.f0.getTopId(),
    +						value.f1.getTopId(),
    +						new Tuple2<>(
    +							value.f0.getValue(),
    +							value.f1.getValue()));
    +				}
    +			});
    +
    +		return Graph.fromDataSet(topVertices, newEdges, context);
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only bottom vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a top vertex they both connected to.
    +	 *
    +	 * @return bottom projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect bottom vertices in the original graph
    +	 */
    +	public Graph<KB, VVB, Tuple2<EV, EV>> projectionBottomSimple() {
    +
    +		DataSet<Edge<KB, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(0)
    +			.equalTo(0)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>>() {
    +				@Override
    +				public boolean filter(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					BipartiteEdge<KT, KB, EV> edge1 = value.f0;
    +					BipartiteEdge<KT, KB, EV> edge2 = value.f1;
    +					return !edge1.getBottomId().equals(edge2.getBottomId());
    +				}
    +			})
    +			.map(new MapFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>, Edge<KB, Tuple2<EV, EV>>>() {
    +				@Override
    +				public Edge<KB, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					return new Edge<>(
    +						value.f0.getBottomId(),
    +						value.f1.getBottomId(),
    +						new Tuple2<>(
    +							value.f0.getValue(),
    +							value.f1.getValue()));
    +				}
    +			});
    +
    +		return Graph.fromDataSet(bottomVertices, newEdges, context);
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only bottom vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a top vertex they both connected to.
    +	 *
    +	 * @return bottom projection of the bipartite graph where every edge contains a data about projected connection
    +	 * between vertices in the original graph
    +	 */
    +	public Graph<KB, VVB, Projection<KT, VVT, EV, VVB>> projectionBottomFull() {
    --- End diff --
    
    Should we place the code for projectTopFull before projectBottomFull as with the simple projections?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90933026
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,364 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#projectionTopSimple()} or
    + * {@link BipartiteGraph#projectionBottomFull()} methods.
    + *
    + * @param <KT> the key type of the top vertices
    + * @param <KB> the key type of the bottom vertices
    + * @param <VVT> the top vertices value type
    + * @param <VVB> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<KT, VVT>> topVertices;
    +	private final DataSet<Vertex<KB, VVB>> bottomVertices;
    +	private final DataSet<BipartiteEdge<KT, KB, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<KT, VVT>> topVertices,
    +			DataSet<Vertex<KB, VVB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<KT, VVT>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<KB, VVB>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<KT, KB, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they are both connected to.
    +	 *
    +	 * @return top projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect top vertices in the original graph
    +	 */
    +	public Graph<KT, VVT, Tuple2<EV, EV>> projectionTopSimple() {
    +
    +		DataSet<Edge<KT, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(1)
    +			.equalTo(1)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>>() {
    +				@Override
    +				public boolean filter(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					BipartiteEdge<KT, KB, EV> edge1 = value.f0;
    +					BipartiteEdge<KT, KB, EV> edge2 = value.f1;
    +					return !edge1.getTopId().equals(edge2.getTopId());
    +				}
    +			})
    +			.map(new MapFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>, Edge<KT, Tuple2<EV, EV>>>() {
    +				@Override
    +				public Edge<KT, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					return new Edge<>(
    +						value.f0.getTopId(),
    +						value.f1.getTopId(),
    +						new Tuple2<>(
    +							value.f0.getValue(),
    +							value.f1.getValue()));
    +				}
    +			});
    --- End diff --
    
    Add forwarded fields annotation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90924109
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,364 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#projectionTopSimple()} or
    + * {@link BipartiteGraph#projectionBottomFull()} methods.
    + *
    + * @param <KT> the key type of the top vertices
    + * @param <KB> the key type of the bottom vertices
    + * @param <VVT> the top vertices value type
    --- End diff --
    
    For VVT (and similar for VVB), how about "the vertex value type for top vertices"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90921530
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteEdge.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.java.tuple.Tuple3;
    +
    +/**
    + *
    --- End diff --
    
    Empty line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2564


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90931164
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,364 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#projectionTopSimple()} or
    + * {@link BipartiteGraph#projectionBottomFull()} methods.
    + *
    + * @param <KT> the key type of the top vertices
    + * @param <KB> the key type of the bottom vertices
    + * @param <VVT> the top vertices value type
    + * @param <VVB> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<KT, VVT>> topVertices;
    +	private final DataSet<Vertex<KB, VVB>> bottomVertices;
    +	private final DataSet<BipartiteEdge<KT, KB, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<KT, VVT>> topVertices,
    +			DataSet<Vertex<KB, VVB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<KT, VVT>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<KB, VVB>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<KT, KB, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they are both connected to.
    +	 *
    +	 * @return top projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect top vertices in the original graph
    +	 */
    +	public Graph<KT, VVT, Tuple2<EV, EV>> projectionTopSimple() {
    +
    +		DataSet<Edge<KT, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(1)
    +			.equalTo(1)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>>() {
    +				@Override
    +				public boolean filter(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					BipartiteEdge<KT, KB, EV> edge1 = value.f0;
    +					BipartiteEdge<KT, KB, EV> edge2 = value.f1;
    +					return !edge1.getTopId().equals(edge2.getTopId());
    +				}
    +			})
    +			.map(new MapFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>, Edge<KT, Tuple2<EV, EV>>>() {
    +				@Override
    +				public Edge<KT, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					return new Edge<>(
    +						value.f0.getTopId(),
    +						value.f1.getTopId(),
    +						new Tuple2<>(
    +							value.f0.getValue(),
    +							value.f1.getValue()));
    +				}
    +			});
    +
    +		return Graph.fromDataSet(topVertices, newEdges, context);
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only bottom vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a top vertex they both connected to.
    +	 *
    +	 * @return bottom projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect bottom vertices in the original graph
    +	 */
    +	public Graph<KB, VVB, Tuple2<EV, EV>> projectionBottomSimple() {
    +
    +		DataSet<Edge<KB, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(0)
    +			.equalTo(0)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>>() {
    +				@Override
    +				public boolean filter(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					BipartiteEdge<KT, KB, EV> edge1 = value.f0;
    +					BipartiteEdge<KT, KB, EV> edge2 = value.f1;
    +					return !edge1.getBottomId().equals(edge2.getBottomId());
    +				}
    +			})
    +			.map(new MapFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>, Edge<KB, Tuple2<EV, EV>>>() {
    +				@Override
    +				public Edge<KB, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					return new Edge<>(
    +						value.f0.getBottomId(),
    +						value.f1.getBottomId(),
    +						new Tuple2<>(
    +							value.f0.getValue(),
    +							value.f1.getValue()));
    +				}
    +			});
    +
    +		return Graph.fromDataSet(bottomVertices, newEdges, context);
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only bottom vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a top vertex they both connected to.
    +	 *
    +	 * @return bottom projection of the bipartite graph where every edge contains a data about projected connection
    +	 * between vertices in the original graph
    +	 */
    +	public Graph<KB, VVB, Projection<KT, VVT, EV, VVB>> projectionBottomFull() {
    +
    +		// Join edges with bottom vertices to preserve edges values
    +		DataSet<Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>> ds = edges.join(bottomVertices)
    +			.where(1)
    +			.equalTo(0);
    +
    +		DataSet<Edge<KB, Projection<KT, VVT, EV, VVB>>> newEdges = ds.join(ds)
    +			.where(new KeySelector<Tuple2<BipartiteEdge<KT,KB,EV>,Vertex<KB,VVB>>, KT>() {
    --- End diff --
    
    Can say ".where("f0.getTopId()")" rather than using a KeySelector. We could also use ".where("f0.f0")".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    Hi @greghogan , I've fixed the PR according to your review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90937167
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,364 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#projectionTopSimple()} or
    + * {@link BipartiteGraph#projectionBottomFull()} methods.
    + *
    + * @param <KT> the key type of the top vertices
    + * @param <KB> the key type of the bottom vertices
    + * @param <VVT> the top vertices value type
    + * @param <VVB> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<KT, VVT>> topVertices;
    +	private final DataSet<Vertex<KB, VVB>> bottomVertices;
    +	private final DataSet<BipartiteEdge<KT, KB, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<KT, VVT>> topVertices,
    +			DataSet<Vertex<KB, VVB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<KT, VVT>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<KB, VVB>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<KT, KB, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they are both connected to.
    +	 *
    +	 * @return top projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect top vertices in the original graph
    +	 */
    +	public Graph<KT, VVT, Tuple2<EV, EV>> projectionTopSimple() {
    +
    +		DataSet<Edge<KT, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(1)
    +			.equalTo(1)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>>() {
    +				@Override
    +				public boolean filter(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					BipartiteEdge<KT, KB, EV> edge1 = value.f0;
    +					BipartiteEdge<KT, KB, EV> edge2 = value.f1;
    +					return !edge1.getTopId().equals(edge2.getTopId());
    +				}
    +			})
    +			.map(new MapFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>, Edge<KT, Tuple2<EV, EV>>>() {
    +				@Override
    +				public Edge<KT, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					return new Edge<>(
    +						value.f0.getTopId(),
    +						value.f1.getTopId(),
    +						new Tuple2<>(
    +							value.f0.getValue(),
    +							value.f1.getValue()));
    +				}
    +			});
    +
    +		return Graph.fromDataSet(topVertices, newEdges, context);
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only bottom vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a top vertex they both connected to.
    +	 *
    +	 * @return bottom projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect bottom vertices in the original graph
    +	 */
    +	public Graph<KB, VVB, Tuple2<EV, EV>> projectionBottomSimple() {
    +
    +		DataSet<Edge<KB, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(0)
    +			.equalTo(0)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>>() {
    +				@Override
    +				public boolean filter(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					BipartiteEdge<KT, KB, EV> edge1 = value.f0;
    +					BipartiteEdge<KT, KB, EV> edge2 = value.f1;
    +					return !edge1.getBottomId().equals(edge2.getBottomId());
    +				}
    +			})
    +			.map(new MapFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>, Edge<KB, Tuple2<EV, EV>>>() {
    +				@Override
    +				public Edge<KB, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					return new Edge<>(
    +						value.f0.getBottomId(),
    +						value.f1.getBottomId(),
    +						new Tuple2<>(
    +							value.f0.getValue(),
    +							value.f1.getValue()));
    +				}
    +			});
    +
    +		return Graph.fromDataSet(bottomVertices, newEdges, context);
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only bottom vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a top vertex they both connected to.
    +	 *
    +	 * @return bottom projection of the bipartite graph where every edge contains a data about projected connection
    +	 * between vertices in the original graph
    +	 */
    +	public Graph<KB, VVB, Projection<KT, VVT, EV, VVB>> projectionBottomFull() {
    +
    +		// Join edges with bottom vertices to preserve edges values
    +		DataSet<Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>> ds = edges.join(bottomVertices)
    +			.where(1)
    +			.equalTo(0);
    +
    +		DataSet<Edge<KB, Projection<KT, VVT, EV, VVB>>> newEdges = ds.join(ds)
    +			.where(new KeySelector<Tuple2<BipartiteEdge<KT,KB,EV>,Vertex<KB,VVB>>, KT>() {
    +				@Override
    +				public KT getKey(Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>> value) throws Exception {
    +					return value.f0.getTopId();
    +				}
    +			})
    +			.equalTo(new KeySelector<Tuple2<BipartiteEdge<KT,KB,EV>,Vertex<KB,VVB>>, KT>() {
    +				@Override
    +				public KT getKey(Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>> value) throws Exception {
    +					return value.f0.getTopId();
    +				}
    +			})
    +			// Filter all pairs with the same bottom id
    +			.filter(new FilterFunction<Tuple2<
    +					Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>,
    +					Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>>>() {
    +				@Override
    +				public boolean filter(Tuple2<
    +						Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>,
    +						Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>> value) throws Exception {
    +					BipartiteEdge<KT, KB, EV> edge1 = value.f0.f0;
    +					BipartiteEdge<KT, KB, EV> edge2 = value.f1.f0;
    +					return !edge1.getBottomId().equals(edge2.getBottomId());
    +				}
    +			})
    +			// Join with top vertices to preserve top vertices values
    +			.join(topVertices)
    --- End diff --
    
    Is it not more efficient to do the Cartesian join last if we assume that the |vertices| << |bipartite edges| << |simple edges|? The code can be reused between the full projection functions: first join the bipartite edge with top vertex, then join the result with the bottom vertex (perhaps using the Projection class below with NullValue where appropriate).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    Tuple8 does not seem to friendly to me either. What do you mean by "attaching the labels"? Is it something similar to what we do with Edge/Vertex classes right, inheriting Tuple and providing getters and setters to access values in it? Or is there some other way to attach labels to tuples?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    I would go for `org.apache.flink.graph.bipartite`. I think that `bidirectional` simply suggests that each edge exists in both directions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81787532
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteEdge.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.java.tuple.Tuple3;
    +
    +/**
    + *
    + * A BipartiteEdge represents a link between a top and bottom vertices
    + * in a {@link BipartiteGraph}. It is similar to the {@link Edge} class
    + * with the only difference that key of connected vertices can have
    + * different types.
    + *
    + * @param <TK> the key type of the top vertices
    + * @param <BK> the key type of the bottom vertices
    + * @param <EV> the edge value type
    + */
    +public class BipartiteEdge<TK, BK, EV> extends Tuple3<TK, BK, EV> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public BipartiteEdge(){}
    --- End diff --
    
    Good catch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    What I meant is simply creating an edge with a Tuple2 label containing the labels of the two edges in the bipartite graph. Makes sense?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    Hi @greghogan. Thank you for clarification. I'll update code accordingly. Do you have any other comments regarding the PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    New **gelly** tests failed with errors like:
    
    > Caused by: java.io.IOException: Insufficient number of network buffers: required 32, but only 3 available. The total number of network buffers is currently set to 2048. You can increase this number by setting the configuration key 'taskmanager.network.numberOfBuffers'.
    
    Do you know what is causing this error? Should I update the code somehow?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    Makes sense to me. I'll implement this during weekend.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90935587
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,364 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#projectionTopSimple()} or
    + * {@link BipartiteGraph#projectionBottomFull()} methods.
    + *
    + * @param <KT> the key type of the top vertices
    + * @param <KB> the key type of the bottom vertices
    + * @param <VVT> the top vertices value type
    + * @param <VVB> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<KT, VVT>> topVertices;
    +	private final DataSet<Vertex<KB, VVB>> bottomVertices;
    +	private final DataSet<BipartiteEdge<KT, KB, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<KT, VVT>> topVertices,
    +			DataSet<Vertex<KB, VVB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<KT, VVT>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<KB, VVB>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<KT, KB, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they are both connected to.
    +	 *
    +	 * @return top projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect top vertices in the original graph
    +	 */
    +	public Graph<KT, VVT, Tuple2<EV, EV>> projectionTopSimple() {
    +
    +		DataSet<Edge<KT, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(1)
    +			.equalTo(1)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>>() {
    +				@Override
    +				public boolean filter(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					BipartiteEdge<KT, KB, EV> edge1 = value.f0;
    +					BipartiteEdge<KT, KB, EV> edge2 = value.f1;
    +					return !edge1.getTopId().equals(edge2.getTopId());
    +				}
    +			})
    +			.map(new MapFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>, Edge<KT, Tuple2<EV, EV>>>() {
    +				@Override
    +				public Edge<KT, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					return new Edge<>(
    +						value.f0.getTopId(),
    +						value.f1.getTopId(),
    +						new Tuple2<>(
    +							value.f0.getValue(),
    +							value.f1.getValue()));
    +				}
    +			});
    +
    +		return Graph.fromDataSet(topVertices, newEdges, context);
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only bottom vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a top vertex they both connected to.
    +	 *
    +	 * @return bottom projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect bottom vertices in the original graph
    +	 */
    +	public Graph<KB, VVB, Tuple2<EV, EV>> projectionBottomSimple() {
    +
    +		DataSet<Edge<KB, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(0)
    +			.equalTo(0)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>>() {
    +				@Override
    +				public boolean filter(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					BipartiteEdge<KT, KB, EV> edge1 = value.f0;
    +					BipartiteEdge<KT, KB, EV> edge2 = value.f1;
    +					return !edge1.getBottomId().equals(edge2.getBottomId());
    +				}
    +			})
    +			.map(new MapFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>, Edge<KB, Tuple2<EV, EV>>>() {
    +				@Override
    +				public Edge<KB, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					return new Edge<>(
    +						value.f0.getBottomId(),
    +						value.f1.getBottomId(),
    +						new Tuple2<>(
    +							value.f0.getValue(),
    +							value.f1.getValue()));
    +				}
    +			});
    +
    +		return Graph.fromDataSet(bottomVertices, newEdges, context);
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only bottom vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a top vertex they both connected to.
    +	 *
    +	 * @return bottom projection of the bipartite graph where every edge contains a data about projected connection
    +	 * between vertices in the original graph
    +	 */
    +	public Graph<KB, VVB, Projection<KT, VVT, EV, VVB>> projectionBottomFull() {
    +
    +		// Join edges with bottom vertices to preserve edges values
    +		DataSet<Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>> ds = edges.join(bottomVertices)
    --- End diff --
    
    Use a JoinHint to repartition-hash the vertices?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    @greghogan @vasia I've update the code according to your suggestion.
    The only thing that I did differently: I return Tuple4 from a more complete version of a bottom/top projections it contains vertex key, vertex value and values of two vertices. I assumed that to get values of two other vertices I would need to perform two other joins which will make the method much slower, while a user can do with the result of the method if needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81225166
  
    --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---
    @@ -480,7 +480,8 @@ protected static File asFile(String path) {
     			}
     		}
     		
    -		assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length);
    +		assertEquals(String.format("Wrong number of elements result. Expected: %s. Result: %s.", Arrays.toString(expectedStrings), Arrays.toString(resultStrings)),
    --- End diff --
    
    Doesn't IntelliJ offer to view the different results?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81246840
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,231 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist
    + * between a pair of vertices from different vertices sets. E.g. there can be no vertices between a pair
    + * of top vertices.
    + *
    + * <p>Bipartite graph is useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or
    + * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods.
    + *
    + * @param <KT> the key type of the top vertices
    + * @param <KB> the key type of the bottom vertices
    + * @param <VT> the top vertices value type
    + * @param <VB> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<KT, KB, VT, VB, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<KT, VT>> topVertices;
    +	private final DataSet<Vertex<KB, VB>> bottomVertices;
    +	private final DataSet<BipartiteEdge<KT, KB, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<KT, VT>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<KB, VB>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<KT, KB, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they both connected to.
    +	 *
    +	 * <p>Caller should provide a function that will create an edge between two top vertices. This function will receive
    --- End diff --
    
    I thought that different datasets would require different algorithms to consolidate a number of connections into a single edge value. Hence the callback function. But I think Greg's idea is better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    @greghogan 
    Thank you for the suggestion.
    The build is passing now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81226805
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,231 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist
    + * between a pair of vertices from different vertices sets. E.g. there can be no vertices between a pair
    + * of top vertices.
    + *
    + * <p>Bipartite graph is useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#topProjection(GroupReduceFunction)} or
    + * {@link BipartiteGraph#bottomProjection(GroupReduceFunction)} methods.
    + *
    + * @param <KT> the key type of the top vertices
    + * @param <KB> the key type of the bottom vertices
    + * @param <VT> the top vertices value type
    + * @param <VB> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<KT, KB, VT, VB, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<KT, VT>> topVertices;
    +	private final DataSet<Vertex<KB, VB>> bottomVertices;
    +	private final DataSet<BipartiteEdge<KT, KB, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<KT, VT>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<KB, VB>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<KT, KB, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they both connected to.
    +	 *
    +	 * <p>Caller should provide a function that will create an edge between two top vertices. This function will receive
    +	 * a collection of all connections between each pair of top vertices. Note that this function will be called twice for
    +	 * each pair of connected vertices, so it's up to a caller if one or two edges should be created.
    +	 *
    +	 * @param edgeFactory function that will be used to create edges in the new graph
    +	 * @param <NEV> the edge value type in the new graph
    +	 * @return top projection of the bipartite graph
    +	 */
    +	public <NEV> Graph<KT, VT, NEV> topProjection(
    +		final GroupReduceFunction<Tuple2<
    +									Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT, KB, EV>>,
    +									Vertex<KB, VB>>,
    +								Edge<KT, NEV>> edgeFactory) {
    +
    +		DataSet<Edge<KT, NEV>> newEdges = edges.join(edges)
    +			.where(new TopProjectionKeySelector<KT, KB, EV>())
    --- End diff --
    
    Using the field index should be faster than a key selector, and allows the optimizer to reuse sorted fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81217922
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,231 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist
    --- End diff --
    
    I would rephrase that to "... a graph whose vertices can be divided into two disjoint sets"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r91585475
  
    --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---
    @@ -480,7 +480,8 @@ protected static File asFile(String path) {
     			}
     		}
     		
    -		assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length);
    +		assertEquals(String.format("Wrong number of elements result. Expected: %s. Result: %s.", Arrays.toString(expectedStrings), Arrays.toString(resultStrings)),
    --- End diff --
    
    What if we moved `String.format` to its own line, included in the string both the array lengths and contents, and added a comment to describe why we are also printing the full arrays?
    
    Also, should the arrays be printed on new lines such that they would line up until the diverging element? We'll need to move the sorting of the arrays before the length check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81246465
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java ---
    @@ -34,10 +34,10 @@
     
     	public Edge(){}
     
    -	public Edge(K src, K trg, V val) {
    -		this.f0 = src;
    -		this.f1 = trg;
    -		this.f2 = val;
    +	public Edge(K source, K target, V value) {
    --- End diff --
    
    To make them consistent with naming style in other classes.
    Do you suggest to revert this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    The advantage to joining on vertex values before the grouped cross is that the number of projected vertices is quadratic in the vertex degree. The projected graphs will usually be much larger than the bipartite graph.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81764640
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#simpleTopProjection()} or
    + * {@link BipartiteGraph#fullBottomProjection()} methods.
    + *
    + * @param <TK> the key type of the top vertices
    + * @param <BK> the key type of the bottom vertices
    + * @param <TV> the top vertices value type
    + * @param <BV> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<TK, BK, TV, BV, EV> {
    --- End diff --
    
    Would the generic parameters be easier to read as `public class BipartiteGraph<KT, KB, VVT, VVB, EV> {`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    Thanks for the update @mushketyk and for the review @greghogan. I agree with your suggestions. For the type parameters I would go for `<KT, KB, VVT, VVB, EV>`. Let me know if there's any other issue you'd like my opinion on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81797332
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#simpleTopProjection()} or
    + * {@link BipartiteGraph#fullBottomProjection()} methods.
    + *
    + * @param <TK> the key type of the top vertices
    + * @param <BK> the key type of the bottom vertices
    + * @param <TV> the top vertices value type
    + * @param <BV> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<TK, BK, TV, BV, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<TK, TV>> topVertices;
    +	private final DataSet<Vertex<BK, BV>> bottomVertices;
    +	private final DataSet<BipartiteEdge<TK, BK, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<TK, TV>> topVertices,
    +			DataSet<Vertex<BK, BV>> bottomVertices,
    +			DataSet<BipartiteEdge<TK, BK, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<TK, TV>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<BK, BV>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<TK, BK, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they are both connected to.
    +	 *
    +	 * @return top projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect top vertices in the original graph
    +	 */
    +	public Graph<TK, TV, Tuple2<EV, EV>> simpleTopProjection() {
    +
    +		DataSet<Edge<TK, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(1)
    +			.equalTo(1)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>>>() {
    +				@Override
    +				public boolean filter(Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>> value) throws Exception {
    +					BipartiteEdge<TK, BK, EV> edge1 = value.f0;
    +					BipartiteEdge<TK, BK, EV> edge2 = value.f1;
    +					return !edge1.getTopId().equals(edge2.getTopId());
    +				}
    +			})
    +			.map(new MapFunction<Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>>, Edge<TK, Tuple2<EV, EV>>>() {
    +				@Override
    +				public Edge<TK, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>> value) throws Exception {
    +					return new Edge<>(
    --- End diff --
    
    I don't think I understand what you mean. Could you elaborate please?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90923574
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteEdge.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.java.tuple.Tuple3;
    +
    +/**
    + *
    + * A BipartiteEdge represents a link between a top and bottom vertices
    + * in a {@link BipartiteGraph}. It is similar to the {@link Edge} class
    + * with the only difference that key of connected vertices can have
    + * different types.
    + *
    + * @param <KT> the key type of the top vertices
    + * @param <KB> the key type of the bottom vertices
    + * @param <EV> the edge value type
    + */
    +public class BipartiteEdge<KT, KB, EV> extends Tuple3<KT, KB, EV> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public BipartiteEdge() {}
    +
    +	public BipartiteEdge(KT topId, KB bottomId, EV value) {
    +		this.f0 = topId;
    +		this.f1 = bottomId;
    +		this.f2 = value;
    +	}
    +
    +	public KT getTopId() {
    +		return this.f0;
    +	}
    +
    +	public void setTopId(KT i) {
    --- End diff --
    
    Parameter name "i" -> "topId"? Also, below for "i" -> "bottomId" and "newValue" -> "value"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the issue:

    https://github.com/apache/flink/pull/2564
  
    Thank @mushketyk. @greghogan are you shepherding this PR or shall I? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r86694144
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#simpleTopProjection()} or
    + * {@link BipartiteGraph#fullBottomProjection()} methods.
    + *
    + * @param <TK> the key type of the top vertices
    + * @param <BK> the key type of the bottom vertices
    + * @param <TV> the top vertices value type
    + * @param <BV> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<TK, BK, TV, BV, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<TK, TV>> topVertices;
    +	private final DataSet<Vertex<BK, BV>> bottomVertices;
    +	private final DataSet<BipartiteEdge<TK, BK, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<TK, TV>> topVertices,
    +			DataSet<Vertex<BK, BV>> bottomVertices,
    +			DataSet<BipartiteEdge<TK, BK, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<TK, TV>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<BK, BV>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<TK, BK, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they are both connected to.
    +	 *
    +	 * @return top projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect top vertices in the original graph
    +	 */
    +	public Graph<TK, TV, Tuple2<EV, EV>> simpleTopProjection() {
    --- End diff --
    
    Sorry, I didn't get your point. Could you please elaborate on this please?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81779541
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g. there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using
    + * {@link BipartiteGraph#simpleTopProjection()} or
    + * {@link BipartiteGraph#fullBottomProjection()} methods.
    + *
    + * @param <TK> the key type of the top vertices
    + * @param <BK> the key type of the bottom vertices
    + * @param <TV> the top vertices value type
    + * @param <BV> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<TK, BK, TV, BV, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<TK, TV>> topVertices;
    +	private final DataSet<Vertex<BK, BV>> bottomVertices;
    +	private final DataSet<BipartiteEdge<TK, BK, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<TK, TV>> topVertices,
    +			DataSet<Vertex<BK, BV>> bottomVertices,
    +			DataSet<BipartiteEdge<TK, BK, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<TK, TV>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<BK, BV>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<TK, BK, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they are both connected to.
    +	 *
    +	 * @return top projection of the bipartite graph where every edge contains a tuple with values of two edges that
    +	 * connect top vertices in the original graph
    +	 */
    +	public Graph<TK, TV, Tuple2<EV, EV>> simpleTopProjection() {
    +
    +		DataSet<Edge<TK, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(1)
    +			.equalTo(1)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>>>() {
    +				@Override
    +				public boolean filter(Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>> value) throws Exception {
    +					BipartiteEdge<TK, BK, EV> edge1 = value.f0;
    +					BipartiteEdge<TK, BK, EV> edge2 = value.f1;
    +					return !edge1.getTopId().equals(edge2.getTopId());
    +				}
    +			})
    +			.map(new MapFunction<Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>>, Edge<TK, Tuple2<EV, EV>>>() {
    +				@Override
    +				public Edge<TK, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<TK, BK, EV>, BipartiteEdge<TK, BK, EV>> value) throws Exception {
    +					return new Edge<>(
    --- End diff --
    
    The `Edge` and nested `Tuple2` can be reused.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r81218056
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java ---
    @@ -0,0 +1,231 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph that contains two sets of vertices: top vertices and bottom vertices. Edges can only exist
    + * between a pair of vertices from different vertices sets. E.g. there can be no vertices between a pair
    + * of top vertices.
    + *
    + * <p>Bipartite graph is useful to represent graphs with two sets of objects, like researchers and their publications,
    --- End diff --
    
    Bipartite graphs are useful...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---