You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by ssaumitra <gi...@git.apache.org> on 2015/10/10 13:53:28 UTC

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

GitHub user ssaumitra opened a pull request:

    https://github.com/apache/flink/pull/1250

    [FLINK-2714] [Gelly] Copy triangle counting logic from EnumTrianglesOpt.java to Gelly library.

     Also reorganizing classes to use Gelly's Graph APIs.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ssaumitra/flink FLINK_2714

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1250.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1250
    
----
commit 7a8f24331bb2d0ad7b050745d832ba4ce777982e
Author: Saumitra Shahapure <sa...@gmail.com>
Date:   2015-10-09T21:51:45Z

    [FLINK-2714] Copy triangle counting logic from EnumTrianglesOpt.java to Gelly library. Also reorganizing classes to use Gelly's Graph APIs.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1250#issuecomment-148663100
  
    Thanks a lot for the quick update @ssaumitra! It looks good now :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by ssaumitra <gi...@git.apache.org>.
Github user ssaumitra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1250#discussion_r42108353
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java ---
    @@ -0,0 +1,347 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.JoinFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.common.operators.Order;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.FunctionAnnotation;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.example.utils.TriangleCountData;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +
    +/**
    + * This function returns number of triangles present in the input graph.
    + * A triangle consists of three edges that connect three vertices with each other.
    + * <p>
    + * <p>
    + * The basic algorithm works as follows:
    + * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices
    + * that are connected by two edges. Finally, all triads are filtered for which no third edge exists
    + * that closes the triangle.
    + * <p>
    + * <p>
    + * For a group of <i>n</i> edges that share a common vertex, the number of built triads is quadratic <i>((n*(n-1))/2)</i>.
    + * Therefore, an optimization of the algorithm is to group edges on the vertex with the smaller output degree to
    + * reduce the number of triads.
    + * This implementation extends the basic algorithm by computing output degrees of edge vertices and
    + * grouping on edges on the vertex with the smaller degree.
    + */
    +
    +public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K,K,K>>> {
    +	@Override
    +	public DataSet<Tuple3<K,K,K>> run(Graph<K, VV, EV> input) throws Exception {
    +
    +		DataSet<Edge<K, EV>> edges = input.getEdges();
    +
    +		// annotate edges with degrees
    +		DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<K, EV>())
    +				.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<K, EV>())
    +				.groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<K>());
    +
    +		// project edges by degrees
    +		DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<K>());
    +		// project edges by vertex id
    +		DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<K>());
    +
    +		DataSet<Tuple3<K,K,K>> triangles = edgesByDegree
    +				// build triads
    +				.groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
    +				.reduceGroup(new TriadBuilder())
    +				// filter triads
    +				.join(edgesById).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<K>());
    +
    +		return triangles;
    +	}
    +
    +	/**
    +	 * Emits for an edge the original edge and its switched version.
    +	 */
    +	private static final class EdgeDuplicator<K, EV> implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> {
    +
    +		@Override
    +		public void flatMap(Edge<K, EV> edge, Collector<Edge<K, EV>> out) throws Exception {
    +			out.collect(edge);
    +			Edge<K, EV> reversed = edge.reverse();
    +			out.collect(reversed);
    +		}
    +	}
    +
    +	/**
    +	 * Counts the number of edges that share a common vertex.
    +	 * Emits one edge for each input edge with a degree annotation for the shared vertex.
    +	 * For each emitted edge, the first vertex is the vertex with the smaller id.
    +	 */
    +	private static final class DegreeCounter<K extends Comparable<K>, EV>
    --- End diff --
    
    As I understand, you are suggesting the signature to be
    ```java
    private static final class EdgeDuplicator<K> implements FlatMapFunction<Edge<K, EV>, Edge<K, NullValue>>
    ```
    But since this takes the raw edges passed by the caller `EV` is still necessary, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1250#discussion_r42106061
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java ---
    @@ -0,0 +1,347 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.JoinFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.common.operators.Order;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.FunctionAnnotation;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.example.utils.TriangleCountData;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +
    +/**
    + * This function returns number of triangles present in the input graph.
    --- End diff --
    
    Also, it might be useful to clarify that the edge direction is not considered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by ssaumitra <gi...@git.apache.org>.
Github user ssaumitra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1250#discussion_r42153609
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java ---
    @@ -0,0 +1,347 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.JoinFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.common.operators.Order;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.FunctionAnnotation;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.example.utils.TriangleCountData;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +
    +/**
    + * This function returns number of triangles present in the input graph.
    --- End diff --
    
    Corrected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by ssaumitra <gi...@git.apache.org>.
Github user ssaumitra commented on the pull request:

    https://github.com/apache/flink/pull/1250#issuecomment-147186203
  
    I have converted it to TriangleEnumerator now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1250#discussion_r42113501
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java ---
    @@ -0,0 +1,347 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.JoinFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.common.operators.Order;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.FunctionAnnotation;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.example.utils.TriangleCountData;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +
    +/**
    + * This function returns number of triangles present in the input graph.
    + * A triangle consists of three edges that connect three vertices with each other.
    + * <p>
    + * <p>
    + * The basic algorithm works as follows:
    + * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices
    + * that are connected by two edges. Finally, all triads are filtered for which no third edge exists
    + * that closes the triangle.
    + * <p>
    + * <p>
    + * For a group of <i>n</i> edges that share a common vertex, the number of built triads is quadratic <i>((n*(n-1))/2)</i>.
    + * Therefore, an optimization of the algorithm is to group edges on the vertex with the smaller output degree to
    + * reduce the number of triads.
    + * This implementation extends the basic algorithm by computing output degrees of edge vertices and
    + * grouping on edges on the vertex with the smaller degree.
    + */
    +
    +public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K,K,K>>> {
    +	@Override
    +	public DataSet<Tuple3<K,K,K>> run(Graph<K, VV, EV> input) throws Exception {
    +
    +		DataSet<Edge<K, EV>> edges = input.getEdges();
    +
    +		// annotate edges with degrees
    +		DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<K, EV>())
    +				.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<K, EV>())
    +				.groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<K>());
    +
    +		// project edges by degrees
    +		DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<K>());
    +		// project edges by vertex id
    +		DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<K>());
    +
    +		DataSet<Tuple3<K,K,K>> triangles = edgesByDegree
    +				// build triads
    +				.groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
    +				.reduceGroup(new TriadBuilder())
    +				// filter triads
    +				.join(edgesById).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<K>());
    +
    +		return triangles;
    +	}
    +
    +	/**
    +	 * Emits for an edge the original edge and its switched version.
    +	 */
    +	private static final class EdgeDuplicator<K, EV> implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> {
    +
    +		@Override
    +		public void flatMap(Edge<K, EV> edge, Collector<Edge<K, EV>> out) throws Exception {
    +			out.collect(edge);
    +			Edge<K, EV> reversed = edge.reverse();
    +			out.collect(reversed);
    +		}
    +	}
    +
    +	/**
    +	 * Counts the number of edges that share a common vertex.
    +	 * Emits one edge for each input edge with a degree annotation for the shared vertex.
    +	 * For each emitted edge, the first vertex is the vertex with the smaller id.
    +	 */
    +	private static final class DegreeCounter<K extends Comparable<K>, EV>
    --- End diff --
    
    Hey, I was actually referring to the `TriadBuilder`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1250#discussion_r42105854
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java ---
    @@ -20,10 +20,12 @@
     
     import org.apache.flink.api.java.DataSet;
     import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple3;
     import org.apache.flink.graph.Edge;
     import org.apache.flink.types.NullValue;
     
     import java.util.ArrayList;
    +import java.util.Arrays;
    --- End diff --
    
    unused import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by ssaumitra <gi...@git.apache.org>.
Github user ssaumitra commented on the pull request:

    https://github.com/apache/flink/pull/1250#issuecomment-147180010
  
    Fir enough. I will make this change in my pull request then. Can you please edit the JIRA too? So that updated pull request and JIRA title stay relevant?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1250#discussion_r42107165
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java ---
    @@ -0,0 +1,347 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.JoinFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.common.operators.Order;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.FunctionAnnotation;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.example.utils.TriangleCountData;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +
    +/**
    + * This function returns number of triangles present in the input graph.
    + * A triangle consists of three edges that connect three vertices with each other.
    + * <p>
    + * <p>
    + * The basic algorithm works as follows:
    + * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices
    + * that are connected by two edges. Finally, all triads are filtered for which no third edge exists
    + * that closes the triangle.
    + * <p>
    + * <p>
    + * For a group of <i>n</i> edges that share a common vertex, the number of built triads is quadratic <i>((n*(n-1))/2)</i>.
    + * Therefore, an optimization of the algorithm is to group edges on the vertex with the smaller output degree to
    + * reduce the number of triads.
    + * This implementation extends the basic algorithm by computing output degrees of edge vertices and
    + * grouping on edges on the vertex with the smaller degree.
    + */
    +
    +public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K,K,K>>> {
    +	@Override
    +	public DataSet<Tuple3<K,K,K>> run(Graph<K, VV, EV> input) throws Exception {
    +
    +		DataSet<Edge<K, EV>> edges = input.getEdges();
    +
    +		// annotate edges with degrees
    +		DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<K, EV>())
    +				.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<K, EV>())
    +				.groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<K>());
    +
    +		// project edges by degrees
    +		DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<K>());
    +		// project edges by vertex id
    +		DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<K>());
    +
    +		DataSet<Tuple3<K,K,K>> triangles = edgesByDegree
    +				// build triads
    +				.groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
    +				.reduceGroup(new TriadBuilder())
    +				// filter triads
    +				.join(edgesById).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<K>());
    +
    +		return triangles;
    +	}
    +
    +	/**
    +	 * Emits for an edge the original edge and its switched version.
    +	 */
    +	private static final class EdgeDuplicator<K, EV> implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> {
    +
    +		@Override
    +		public void flatMap(Edge<K, EV> edge, Collector<Edge<K, EV>> out) throws Exception {
    +			out.collect(edge);
    +			Edge<K, EV> reversed = edge.reverse();
    +			out.collect(reversed);
    +		}
    +	}
    +
    +	/**
    +	 * Counts the number of edges that share a common vertex.
    +	 * Emits one edge for each input edge with a degree annotation for the shared vertex.
    +	 * For each emitted edge, the first vertex is the vertex with the smaller id.
    +	 */
    +	private static final class DegreeCounter<K extends Comparable<K>, EV>
    --- End diff --
    
    I mean since `edgesByDegree` and `edgesById` have both set the edge value type to `NullValue`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1250#discussion_r42105999
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java ---
    @@ -0,0 +1,347 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.JoinFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.common.operators.Order;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.FunctionAnnotation;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.example.utils.TriangleCountData;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +
    +/**
    + * This function returns number of triangles present in the input graph.
    --- End diff --
    
    It now returns the triangles themselves, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by ssaumitra <gi...@git.apache.org>.
Github user ssaumitra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1250#discussion_r42153562
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java ---
    @@ -0,0 +1,347 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.JoinFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.common.operators.Order;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    --- End diff --
    
    My bad


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1250


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1250#issuecomment-147180092
  
    Sure, will do! Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1250#discussion_r42105936
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java ---
    @@ -0,0 +1,347 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.JoinFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.common.operators.Order;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    --- End diff --
    
    unused import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by ssaumitra <gi...@git.apache.org>.
Github user ssaumitra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1250#discussion_r42153585
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java ---
    @@ -0,0 +1,347 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.JoinFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.common.operators.Order;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.FunctionAnnotation;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.example.utils.TriangleCountData;
    --- End diff --
    
    My bad again :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1250#discussion_r42105949
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java ---
    @@ -0,0 +1,347 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.JoinFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.common.operators.Order;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.FunctionAnnotation;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.example.utils.TriangleCountData;
    --- End diff --
    
    same here :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by ssaumitra <gi...@git.apache.org>.
Github user ssaumitra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1250#discussion_r42153539
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java ---
    @@ -20,10 +20,12 @@
     
     import org.apache.flink.api.java.DataSet;
     import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple3;
     import org.apache.flink.graph.Edge;
     import org.apache.flink.types.NullValue;
     
     import java.util.ArrayList;
    +import java.util.Arrays;
    --- End diff --
    
    My bad, I had to optimise them,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by ssaumitra <gi...@git.apache.org>.
Github user ssaumitra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1250#discussion_r41696647
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleCounter.java ---
    @@ -0,0 +1,339 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.JoinFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.common.operators.Order;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.functions.FunctionAnnotation;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +
    +/**
    + * This function returns number of triangles present in the input graph.
    + * A triangle consists of three edges that connect three vertices with each other.
    + * <p>
    + * <p>
    + * The basic algorithm works as follows:
    + * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices
    + * that are connected by two edges. Finally, all triads are filtered for which no third edge exists
    + * that closes the triangle.
    + * <p>
    + * <p>
    + * For a group of <i>n</i> edges that share a common vertex, the number of built triads is quadratic <i>((n*(n-1))/2)</i>.
    + * Therefore, an optimization of the algorithm is to group edges on the vertex with the smaller output degree to
    + * reduce the number of triads.
    + * This implementation extends the basic algorithm by computing output degrees of edge vertices and
    + * grouping on edges on the vertex with the smaller degree.
    + */
    +
    +public class TriangleCounter<K extends Comparable<K>, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Integer>> {
    +	@Override
    +	public DataSet<Integer> run(Graph<K, VV, EV> input) throws Exception {
    +
    +		DataSet<Edge<K, EV>> edges = input.getEdges();
    +
    +		// annotate edges with degrees
    +		DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<K, EV>())
    +				.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<K, EV>())
    +				.groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<K>());
    +
    +		// project edges by degrees
    +		DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<K>());
    +		// project edges by vertex id
    +		DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<K>());
    +
    +		DataSet<Integer> triangles = edgesByDegree
    +				// build triads
    +				.groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
    +				.reduceGroup(new TriadBuilder())
    +				// filter triads
    +				.join(edgesById).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<K>());
    --- End diff --
    
    Just FYI, vertices of Triad and edgesWithDegrees are referred with V1 and V2. And those of edges are referred with indices 0 and 1. 
    IMHO, org.apache.flink.graph.Edge can also have static variables V1, V2 so that group operators are more readable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1250#issuecomment-147179792
  
    Yes @ssaumitra, if we have the enumerator, then it's a matter of calling a `count` to get the number of triangles. So, let's just got for `TriangleEnumerator ` in the library. As for exposing the `Triad`, there's not need to do so. `Triad` actually extends `Tuple3`, so I would say to simply return `DataSet<Tuple3>`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1250#issuecomment-148988753
  
    I will add a description in the docs, squash the commits, and merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1250#issuecomment-147178962
  
    Hi @ssaumitra! Thanks a lot for the pull request, it looks good.
    I was thinking, since this algorithm not only counts but actually enumerates all triangles, we could have 2 library methods, `TriangleCounter` and `TriangleEnumerator`. They would basically use the same algorithm internally, but the first would return the number of triangles, while the second would return a `DataSet` of `Tuple3`, containing the vertex IDs that form triangles. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1250#issuecomment-148347416
  
    Hi @ssaumitra,
    thanks a lot for updating the PR! I have left a few minor comments that should be easy to fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1250#discussion_r42107077
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java ---
    @@ -0,0 +1,347 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.JoinFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.common.operators.Order;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.FunctionAnnotation;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.example.utils.TriangleCountData;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +
    +/**
    + * This function returns number of triangles present in the input graph.
    + * A triangle consists of three edges that connect three vertices with each other.
    + * <p>
    + * <p>
    + * The basic algorithm works as follows:
    + * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices
    + * that are connected by two edges. Finally, all triads are filtered for which no third edge exists
    + * that closes the triangle.
    + * <p>
    + * <p>
    + * For a group of <i>n</i> edges that share a common vertex, the number of built triads is quadratic <i>((n*(n-1))/2)</i>.
    + * Therefore, an optimization of the algorithm is to group edges on the vertex with the smaller output degree to
    + * reduce the number of triads.
    + * This implementation extends the basic algorithm by computing output degrees of edge vertices and
    + * grouping on edges on the vertex with the smaller degree.
    + */
    +
    +public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K,K,K>>> {
    +	@Override
    +	public DataSet<Tuple3<K,K,K>> run(Graph<K, VV, EV> input) throws Exception {
    +
    +		DataSet<Edge<K, EV>> edges = input.getEdges();
    +
    +		// annotate edges with degrees
    +		DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<K, EV>())
    +				.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<K, EV>())
    +				.groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<K>());
    +
    +		// project edges by degrees
    +		DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<K>());
    +		// project edges by vertex id
    +		DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<K>());
    +
    +		DataSet<Tuple3<K,K,K>> triangles = edgesByDegree
    +				// build triads
    +				.groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
    +				.reduceGroup(new TriadBuilder())
    +				// filter triads
    +				.join(edgesById).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<K>());
    +
    +		return triangles;
    +	}
    +
    +	/**
    +	 * Emits for an edge the original edge and its switched version.
    +	 */
    +	private static final class EdgeDuplicator<K, EV> implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> {
    +
    +		@Override
    +		public void flatMap(Edge<K, EV> edge, Collector<Edge<K, EV>> out) throws Exception {
    +			out.collect(edge);
    +			Edge<K, EV> reversed = edge.reverse();
    +			out.collect(reversed);
    +		}
    +	}
    +
    +	/**
    +	 * Counts the number of edges that share a common vertex.
    +	 * Emits one edge for each input edge with a degree annotation for the shared vertex.
    +	 * For each emitted edge, the first vertex is the vertex with the smaller id.
    +	 */
    +	private static final class DegreeCounter<K extends Comparable<K>, EV>
    --- End diff --
    
    I believe you can get rid of the EV type argument, since you're not using the edge value anywhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by ssaumitra <gi...@git.apache.org>.
Github user ssaumitra commented on the pull request:

    https://github.com/apache/flink/pull/1250#issuecomment-147179239
  
    In my opinion, if we provide TriangleEnumerator as a library function, TriangleCounter would be trivial extension to it. So would it be required to add it in library?
    Also if we want to expose triangle enumerator, I think we need to make Triad as a part of library too (right now it's private class) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2714] [Gelly] Copy triangle counting lo...

Posted by ssaumitra <gi...@git.apache.org>.
Github user ssaumitra commented on the pull request:

    https://github.com/apache/flink/pull/1250#issuecomment-148486062
  
    @vasia I have updated it. Please have a look. Also should I be worried about Travis build failure? Looks like it's failing on master too,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---