You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by balidani <gi...@git.apache.org> on 2015/02/16 18:12:19 UTC

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

GitHub user balidani opened a pull request:

    https://github.com/apache/flink/pull/408

    [FLINK-1514][Gelly] Add a Gather-Sum-Apply iteration method

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/balidani/flink gelly-gsa

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/408.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #408
    
----
commit ae96bfefdd09b2d26fae644f071ea882f69a2375
Author: Dániel Bali <ba...@gmail.com>
Date:   2015-02-16T14:06:05Z

    [FLINK-1514][Gelly] Add a Gather-Sum-Apply iteration method

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-93975508
  
    Hi @balidani :)
    
    This PR has not been updated in a while now. Are you facing any issues? In that case I would like to help. Otherwise, if you're busy maybe it makes sense to pass the remainder of the implementation to someone else? 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r26008902
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.gsa;
    +
    +import org.apache.flink.api.common.functions.IterationRuntimeContext;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.Serializable;
    +
    +public abstract class ApplyFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
    +
    +	public abstract void apply(M message, VV vertexValue);
    --- End diff --
    
    Ah. I see. That is more explicit than the alternative (returning `null` in case no update is intended)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-77750264
  
    I am just dropping some more thoughts here ;-)
    
    The interface seems a bit confusing. Right now, one has to first create the iteration from the graph and then tell the graph to run it:
    ``` java
    GatherSumApplyIteration iteration = graph.createGatherSumApplyIteration(gather, sum, apply);
    Graph result = graph.runGatherSumApplyIteration(iteration);
    ```
    
    What was the reason to not go simply for 
    ``` java 
    Graph result = graph.runGatherSumApplyIteration(gather, sum, apply);
    ```
    
    If the reason was additional parameterization of the iteration, would it be nicer do go for the following?
    ``` java
    GatherSumApplyIteration iteration = graph.createGatherSumApplyIteration(gather, sum, apply);
    iteration.configureInSomeWay(...);
    Graph result = iteration.result();
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r26066814
  
    --- Diff: flink-staging/flink-gelly/pom.xml ---
    @@ -52,4 +52,35 @@ under the License.
     			<scope>test</scope>
     		</dependency>
     	</dependencies>
    +
    +    <!-- See main pom.xml for explanation of profiles -->
    +    <profiles>
    +        <profile>
    +            <id>hadoop-1</id>
    +            <activation>
    +                <property>
    +                    <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
    +                    <!--hadoop1--><name>hadoop.profile</name><value>1</value>
    +                </property>
    +            </activation>
    +            <dependencies>
    +                <!-- Add this here, for hadoop-2 we don't need it since we get guava transitively -->
    +                <dependency>
    +                    <groupId>com.google.guava</groupId>
    +                    <artifactId>guava</artifactId>
    +                    <version>${guava.version}</version>
    +                    <scope>provided</scope>
    +                </dependency>
    +            </dependencies>
    +        </profile>
    +        <profile>
    --- End diff --
    
    Thank you.
    I've rebased https://github.com/apache/flink/pull/454 onto your changes in master and adopted it to our new approach of handling dependencies in Flink. There, I've added guava again as a regular dependency.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-77774618
  
    - Regarding annotations, I actually tried as many I could find :)) [This is the commit] (https://github.com/vasia/flink/blob/653cb21981a80c5790ca29804e39e4c3642aee2d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java). I haven't looked at the execution plans, but my annotations didn't make a big difference in the runtime of my experiments.
    - As for having triplets in the solution set, I also thought about this. The problem is that then, you will still have to re-build them, in the solution set delta,  at the end of every iteration (in order to update the solution set). Unless you have something in mind that avoids this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by balidani <gi...@git.apache.org>.
Github user balidani commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r25256087
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.example;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.example.utils.ExampleUtils;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.GatherSumApplyIteration;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
    + */
    +public class GSASingleSourceShortestPathsExample implements ProgramDescription, GraphAlgorithm<Long, Double, Double> {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Program
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if(!parseParameters(args)) {
    +			return;
    +		}
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
    +		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
    +
    +		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
    +
    +		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph
    +				.run(new GSASingleSourceShortestPathsExample()).getVertices();
    +
    +		// emit result
    +		if(fileOutput) {
    +			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
    +		} else {
    +			singleSourceShortestPaths.print();
    +		}
    +
    +		env.execute("GSA Single Source Shortest Paths Example");
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  GraphAlgorithm Functions
    +	// --------------------------------------------------------------------------------------------
    +
    +	@Override
    +	public Graph<Long, Double, Double> run(Graph<Long, Double, Double> input) {
    +		// The path from src to trg through edge e costs src + e
    +		// If the target's distance is 0 (the target is the actual source), return 0 instead
    +		GatherFunction<Long, Double, Double, Double> gather = new SingleSourceShortestPathGather();
    +
    +		// Return the smaller path length to minimize distance
    +		SumFunction<Long, Double, Double, Double> sum = new SingleSourceShortestPathSum();
    +
    +		// Iterate as long as the distance is updated
    +		ApplyFunction<Long, Double, Double, Double> apply = new SingleSourceShortestPathApply();
    +
    +		GatherSumApplyIteration<Long, Double, Double, Double> iteration = input.createGatherSumApplyIteration(
    +				gather, sum, apply, this.maxIterations);
    +		return input.mapVertices(new InitVerticesMapper<Long>(this.srcVertexId))
    +				.runGatherSumApplyIteration(iteration);
    +	}
    +
    +	public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
    +			implements MapFunction<Vertex<K, Double>, Double> {
    +
    +		private K srcVertexId;
    +
    +		public InitVerticesMapper(K srcId) {
    +			this.srcVertexId = srcId;
    +		}
    +
    +		public Double map(Vertex<K, Double> value) {
    +			if (value.f0.equals(srcVertexId)) {
    +				return 0.0;
    +			} else {
    +				return Double.POSITIVE_INFINITY;
    +			}
    +		}
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Single Source Shortest Path UDFs
    +	// --------------------------------------------------------------------------------------------
    +
    +	private static final class SingleSourceShortestPathGather
    +			extends GatherFunction<Long, Double, Double, Double> {
    +		@Override
    +		public Tuple2<Long, Double> gather(Tuple3<Vertex<Long, Double>,
    +				Edge<Long, Double>, Vertex<Long, Double>> triplet) {
    +
    +			if (triplet.f2.getValue() == 0.0) {
    +				return new Tuple2<Long, Double>(triplet.f2.getId(), 0.0);
    +			}
    --- End diff --
    
    When I do this, I get Infinity for vertices 4 and 5. I suppose it's because on the first iteration they are not updated and they are not active vertices any more?
    
    When I change the apply function to have `<` and remove this part from gather, I get huge values (400+), since the iteration never terminates (until `maxIterations` is reached). 
    
    I'll try to find a solution :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r28645524
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.example;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.example.utils.ExampleUtils;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.GatherSumApplyIteration;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.graph.gsa.Triplet;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
    + */
    +public class GSASingleSourceShortestPathsExample implements ProgramDescription {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Program
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if(!parseParameters(args)) {
    +			return;
    +		}
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
    +		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
    +
    +		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
    +
    +		// The path from src to trg through edge e costs src + e
    +		// If the target's distance is 0 (the target is the actual source), return 0 instead
    +		GatherFunction<Double, Double, Double> gather = new SingleSourceShortestPathGather();
    +
    +		// Return the smaller path length to minimize distance
    +		SumFunction<Double, Double, Double> sum = new SingleSourceShortestPathSum();
    +
    +		// Iterate as long as the distance is updated
    +		ApplyFunction<Double, Double, Double> apply = new SingleSourceShortestPathApply();
    +
    +		// Execute the GSA iteration
    +		GatherSumApplyIteration<Long, Double, Double, Double> iteration = graph.createGatherSumApplyIteration(
    +				gather, sum, apply, maxIterations);
    +		Graph<Long, Double, Double> result = graph.mapVertices(new InitVerticesMapper<Long>(srcVertexId))
    +				.runGatherSumApplyIteration(iteration);
    +
    +		// Extract the vertices as the result
    +		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
    +
    +		// emit result
    +		if(fileOutput) {
    +			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
    +		} else {
    +			singleSourceShortestPaths.print();
    +		}
    +
    +		env.execute("GSA Single Source Shortest Paths Example");
    +	}
    +
    +	public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
    +			implements MapFunction<Vertex<K, Double>, Double> {
    +
    +		private K srcVertexId;
    +
    +		public InitVerticesMapper(K srcId) {
    +			this.srcVertexId = srcId;
    +		}
    +
    +		public Double map(Vertex<K, Double> value) {
    +			if (value.f0.equals(srcVertexId)) {
    +				return 0.0;
    +			} else {
    +				return Double.POSITIVE_INFINITY;
    +			}
    +		}
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Single Source Shortest Path UDFs
    +	// --------------------------------------------------------------------------------------------
    +
    +	private static final class SingleSourceShortestPathGather
    +			extends GatherFunction<Double, Double, Double> {
    +		@Override
    +		public Double gather(Triplet<Double, Double> triplet) {
    +			return triplet.getSource() + triplet.getEdge();
    +		}
    +	};
    +
    +	private static final class SingleSourceShortestPathSum
    +			extends SumFunction<Double, Double, Double> {
    +		@Override
    +		public Double sum(Double arg0, Double arg1) {
    +			return Math.min(arg0, arg1);
    +		}
    +	};
    +
    +	private static final class SingleSourceShortestPathApply
    +			extends ApplyFunction<Double, Double, Double> {
    +		@Override
    +		public void apply(Double summed, Double target) {
    --- End diff --
    
    How about this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r28645478
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
    @@ -914,7 +919,7 @@ public long numberOfVertices() throws Exception {
     	}
     
     	/**
    -	 * @return a long integer representing the number of edges
    +	 * @return Singleton DataSet containing the edge count
    --- End diff --
    
    this has changed... did you manually resolve any conflicts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by balidani <gi...@git.apache.org>.
Github user balidani commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-84033474
  
    @vasia was completely right about the GGC algorithm, I misunderstood what it was supposed to do. I implemented the correct version, but it turns out, after reading the GAS paper that this algorithm has problems when executed synchronously (e.g., values can oscillate between 0 and 1). We had a discussion and decided to add a Connected Components example instead, as well as a PageRank example, which I will implement shortly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by balidani <gi...@git.apache.org>.
Github user balidani commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r25255250
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.example;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.example.utils.ExampleUtils;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.GatherSumApplyIteration;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
    + */
    +public class GSASingleSourceShortestPathsExample implements ProgramDescription, GraphAlgorithm<Long, Double, Double> {
    --- End diff --
    
    Makes sense, I was blindly copying this part and didn't realize it's not really needed :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r25389859
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.example;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.example.utils.ExampleUtils;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.GatherSumApplyIteration;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.graph.gsa.Triplet;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
    + */
    +public class GSASingleSourceShortestPathsExample implements ProgramDescription {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Program
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if(!parseParameters(args)) {
    +			return;
    +		}
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
    +		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
    +
    +		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
    +
    +		// The path from src to trg through edge e costs src + e
    +		// If the target's distance is 0 (the target is the actual source), return 0 instead
    --- End diff --
    
    this isn't the case anymore, you can delete this comment :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-75770438
  
    This looks great already! We just need a few more changes: 
    - you can add the `Triplet` type inside the gsa package. This should just be a simple wrapper around `Tuple3<VV, EV, VV>`. (I think we don't even need the Vertex IDs here, just the values).
    
    - I see you have exposed the vertex keys in the UDFs. These should be hidden from the user-facing methods, e.g. in the SSSP example `gather()` should return `Double`, not `Tuple2<Long, Double>`, `sum()` should get two Doubles and return a Double (the minimum) and `apply()` should get a Double (the current value) and an M (the accumulator result) and return a Double.
    More specifically, the UDFs should be defined like this:
    - `gather: Triplet -> M`
    - `sum: <M, M> -> M`
    - `apply: <VV, M> -> VV`
    
    Of course the wrappers inside `GatherSumApplyIteration` (`GatherUDF`, SumUDF`, `ApplyUDF`) will have to maintain the keys. The point is to hide them from the user :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-82588861
  
    Hi @balidani, thanks a lot for the changes! I'll try to run some tests on a cluster soon and test the new version.
    I have a question about the graph coloring example. As far as I understand, in this algorithm, colors are represented by numbers (vertex values) and the goal is to color the graph with the minimum number of colors, so that no 2 neighboring vertices have the same color. So, in each superstep, each vertex gathers the colors of its neighbors in a set and then assigns itself the minimum color that isn't in this set. However, it seems to me that in your implementation, you're only propagating the minimum neighbor value, similar to what you would do in connected components. Is there anything I'm missing here?
    Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by balidani <gi...@git.apache.org>.
Github user balidani commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r25255094
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.example;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.example.utils.ExampleUtils;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.GatherSumApplyIteration;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
    + */
    +public class GSASingleSourceShortestPathsExample implements ProgramDescription, GraphAlgorithm<Long, Double, Double> {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Program
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if(!parseParameters(args)) {
    +			return;
    +		}
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
    +		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
    +
    +		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
    +
    +		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph
    +				.run(new GSASingleSourceShortestPathsExample()).getVertices();
    +
    +		// emit result
    +		if(fileOutput) {
    +			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
    +		} else {
    +			singleSourceShortestPaths.print();
    +		}
    +
    +		env.execute("GSA Single Source Shortest Paths Example");
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  GraphAlgorithm Functions
    +	// --------------------------------------------------------------------------------------------
    +
    +	@Override
    +	public Graph<Long, Double, Double> run(Graph<Long, Double, Double> input) {
    +		// The path from src to trg through edge e costs src + e
    +		// If the target's distance is 0 (the target is the actual source), return 0 instead
    +		GatherFunction<Long, Double, Double, Double> gather = new SingleSourceShortestPathGather();
    +
    +		// Return the smaller path length to minimize distance
    +		SumFunction<Long, Double, Double, Double> sum = new SingleSourceShortestPathSum();
    +
    +		// Iterate as long as the distance is updated
    +		ApplyFunction<Long, Double, Double, Double> apply = new SingleSourceShortestPathApply();
    +
    +		GatherSumApplyIteration<Long, Double, Double, Double> iteration = input.createGatherSumApplyIteration(
    +				gather, sum, apply, this.maxIterations);
    +		return input.mapVertices(new InitVerticesMapper<Long>(this.srcVertexId))
    +				.runGatherSumApplyIteration(iteration);
    +	}
    +
    +	public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
    +			implements MapFunction<Vertex<K, Double>, Double> {
    +
    +		private K srcVertexId;
    +
    +		public InitVerticesMapper(K srcId) {
    +			this.srcVertexId = srcId;
    +		}
    +
    +		public Double map(Vertex<K, Double> value) {
    +			if (value.f0.equals(srcVertexId)) {
    +				return 0.0;
    +			} else {
    +				return Double.POSITIVE_INFINITY;
    +			}
    +		}
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Single Source Shortest Path UDFs
    +	// --------------------------------------------------------------------------------------------
    +
    +	private static final class SingleSourceShortestPathGather
    +			extends GatherFunction<Long, Double, Double, Double> {
    +		@Override
    +		public Tuple2<Long, Double> gather(Tuple3<Vertex<Long, Double>,
    +				Edge<Long, Double>, Vertex<Long, Double>> triplet) {
    +
    +			if (triplet.f2.getValue() == 0.0) {
    +				return new Tuple2<Long, Double>(triplet.f2.getId(), 0.0);
    +			}
    --- End diff --
    
    I needed this to terminate the iteration at one point. In the sample data, this is only triggered on the triplet `((5, X), (5, 1, 51), (1, 0))`, where `X` is the current known shortest path from `(1)` to `(5)`. If I don't have this check, the value for `(1)` will be overwritten with `X + 51`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r26005153
  
    --- Diff: flink-staging/flink-gelly/pom.xml ---
    @@ -52,4 +52,35 @@ under the License.
     			<scope>test</scope>
     		</dependency>
     	</dependencies>
    +
    +    <!-- See main pom.xml for explanation of profiles -->
    +    <profiles>
    +        <profile>
    +            <id>hadoop-1</id>
    +            <activation>
    +                <property>
    +                    <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
    +                    <!--hadoop1--><name>hadoop.profile</name><value>1</value>
    +                </property>
    +            </activation>
    +            <dependencies>
    +                <!-- Add this here, for hadoop-2 we don't need it since we get guava transitively -->
    +                <dependency>
    +                    <groupId>com.google.guava</groupId>
    +                    <artifactId>guava</artifactId>
    +                    <version>${guava.version}</version>
    +                    <scope>provided</scope>
    +                </dependency>
    +            </dependencies>
    +        </profile>
    +        <profile>
    --- End diff --
    
    I don't see why we need the hadoop2 profile here. It doesn't do anything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-75249793
  
    Hi @balidani! Thanks a lot for this PR! Gather-Sum-Apply will be an awesome addition to Gelly ^^
    Here come my comments:
    
    - There's no need for Gather, Sum and Apply functions to implement MapFunction, FlatJoinFunction, etc., since they are wrapped inside those in GatherSumApplyIteration class. Actually, I would use the Rich* versions instead, so that we can have access to open() and close() methods. You can look at how `VertexCentricIteration` wraps the `VertexUpdateFunction` inside a `RichCoGroupFunction`.
    
    - With this small change above, we could also allow access to aggregators and broadcast sets. This must be straight-forward to add (again look at `VertexCentricIteration` for hints). We should also add `getName()`, `setName()`, `getParallelism()`, `setParallelism()` methods to `GatherSumApplyIteration`.
    
    - Finally, it'd be great if you could add the tests you have as examples, i.e. one for Greedy Graph Coloring and one for GSAShortestPaths.
    
    Let me know if you have any doubts!
    
    Thanks again :sunny:


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r25254691
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.example;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.example.utils.ExampleUtils;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.GatherSumApplyIteration;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
    + */
    +public class GSASingleSourceShortestPathsExample implements ProgramDescription, GraphAlgorithm<Long, Double, Double> {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Program
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if(!parseParameters(args)) {
    +			return;
    +		}
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
    +		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
    +
    +		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
    +
    +		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph
    +				.run(new GSASingleSourceShortestPathsExample()).getVertices();
    +
    +		// emit result
    +		if(fileOutput) {
    +			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
    +		} else {
    +			singleSourceShortestPaths.print();
    +		}
    +
    +		env.execute("GSA Single Source Shortest Paths Example");
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  GraphAlgorithm Functions
    +	// --------------------------------------------------------------------------------------------
    +
    +	@Override
    +	public Graph<Long, Double, Double> run(Graph<Long, Double, Double> input) {
    +		// The path from src to trg through edge e costs src + e
    +		// If the target's distance is 0 (the target is the actual source), return 0 instead
    +		GatherFunction<Long, Double, Double, Double> gather = new SingleSourceShortestPathGather();
    +
    +		// Return the smaller path length to minimize distance
    +		SumFunction<Long, Double, Double, Double> sum = new SingleSourceShortestPathSum();
    +
    +		// Iterate as long as the distance is updated
    +		ApplyFunction<Long, Double, Double, Double> apply = new SingleSourceShortestPathApply();
    +
    +		GatherSumApplyIteration<Long, Double, Double, Double> iteration = input.createGatherSumApplyIteration(
    +				gather, sum, apply, this.maxIterations);
    +		return input.mapVertices(new InitVerticesMapper<Long>(this.srcVertexId))
    +				.runGatherSumApplyIteration(iteration);
    +	}
    +
    +	public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
    +			implements MapFunction<Vertex<K, Double>, Double> {
    +
    +		private K srcVertexId;
    +
    +		public InitVerticesMapper(K srcId) {
    +			this.srcVertexId = srcId;
    +		}
    +
    +		public Double map(Vertex<K, Double> value) {
    +			if (value.f0.equals(srcVertexId)) {
    +				return 0.0;
    +			} else {
    +				return Double.POSITIVE_INFINITY;
    +			}
    +		}
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Single Source Shortest Path UDFs
    +	// --------------------------------------------------------------------------------------------
    +
    +	private static final class SingleSourceShortestPathGather
    +			extends GatherFunction<Long, Double, Double, Double> {
    +		@Override
    +		public Tuple2<Long, Double> gather(Tuple3<Vertex<Long, Double>,
    +				Edge<Long, Double>, Vertex<Long, Double>> triplet) {
    +
    +			if (triplet.f2.getValue() == 0.0) {
    +				return new Tuple2<Long, Double>(triplet.f2.getId(), 0.0);
    +			}
    --- End diff --
    
    why is this needed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r25853291
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java ---
    @@ -0,0 +1,374 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.gsa;
    +
    +import org.apache.commons.lang3.Validate;
    +import org.apache.flink.api.common.functions.FlatJoinFunction;
    +import org.apache.flink.api.common.functions.RichFlatJoinFunction;
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.functions.RichReduceFunction;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.operators.CustomUnaryOperation;
    +import org.apache.flink.api.java.operators.DeltaIteration;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This class represents iterative graph computations, programmed in a gather-sum-apply perspective.
    + *
    + * @param <K> The type of the vertex key in the graph
    + * @param <VV> The type of the vertex value in the graph
    + * @param <EV> The type of the edge value in the graph
    + * @param <M> The intermediate type used by the gather, sum and apply functions
    + */
    +public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
    +		VV extends Serializable, EV extends Serializable, M> implements CustomUnaryOperation<Vertex<K, VV>,
    +		Vertex<K, VV>> {
    +
    +	private DataSet<Vertex<K, VV>> vertexDataSet;
    +	private DataSet<Edge<K, EV>> edgeDataSet;
    +
    +	private final GatherFunction<VV, EV, M> gather;
    +	private final SumFunction<VV, EV, M> sum;
    +	private final ApplyFunction<VV, EV, M> apply;
    +	private final int maximumNumberOfIterations;
    +
    +	private String name;
    +	private int parallelism = -1;
    +
    +	// ----------------------------------------------------------------------------------
    +
    +	private GatherSumApplyIteration(GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum,
    +			ApplyFunction<VV, EV, M> apply, DataSet<Edge<K, EV>> edges, int maximumNumberOfIterations) {
    +
    +		Validate.notNull(gather);
    +		Validate.notNull(sum);
    +		Validate.notNull(apply);
    +		Validate.notNull(edges);
    +		Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
    +
    +		this.gather = gather;
    +		this.sum = sum;
    +		this.apply = apply;
    +		this.edgeDataSet = edges;
    +		this.maximumNumberOfIterations = maximumNumberOfIterations;
    +	}
    +
    +
    +	/**
    +	 * Sets the name for the gather-sum-apply iteration. The name is displayed in logs and messages.
    +	 *
    +	 * @param name The name for the iteration.
    +	 */
    +	public void setName(String name) {
    +		this.name = name;
    +	}
    +
    +	/**
    +	 * Gets the name from this gather-sum-apply iteration.
    +	 *
    +	 * @return The name of the iteration.
    +	 */
    +	public String getName() {
    +		return name;
    +	}
    +
    +	/**
    +	 * Sets the degree of parallelism for the iteration.
    +	 *
    +	 * @param parallelism The degree of parallelism.
    +	 */
    +	public void setParallelism(int parallelism) {
    +		Validate.isTrue(parallelism > 0 || parallelism == -1,
    +				"The degree of parallelism must be positive, or -1 (use default).");
    +		this.parallelism = parallelism;
    +	}
    +
    +	/**
    +	 * Gets the iteration's degree of parallelism.
    +	 *
    +	 * @return The iterations parallelism, or -1, if not set.
    +	 */
    +	public int getParallelism() {
    +		return parallelism;
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Custom Operator behavior
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Sets the input data set for this operator. In the case of this operator this input data set represents
    +	 * the set of vertices with their initial state.
    +	 *
    +	 * @param dataSet The input data set, which in the case of this operator represents the set of
    +	 *                vertices with their initial state.
    +	 */
    +	@Override
    +	public void setInput(DataSet<Vertex<K, VV>> dataSet) {
    +		this.vertexDataSet = dataSet;
    +	}
    +
    +	/**
    +	 * Computes the results of the gather-sum-apply iteration
    +	 *
    +	 * @return The resulting DataSet
    +	 */
    +	@Override
    +	public DataSet<Vertex<K, VV>> createResult() {
    +		if (vertexDataSet == null) {
    +			throw new IllegalStateException("The input data set has not been set.");
    +		}
    +
    +		// Prepare type information
    +		TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertexDataSet.getType()).getTypeAt(0);
    +		TypeInformation<M> messageType = TypeExtractor.createTypeInfo(GatherFunction.class, gather.getClass(), 2, null, null);
    +		TypeInformation<Tuple2<K, M>> innerType = new TupleTypeInfo<Tuple2<K, M>>(keyType, messageType);
    +		TypeInformation<Vertex<K, VV>> outputType = vertexDataSet.getType();
    +
    +		// Prepare UDFs
    +		GatherUdf<K, VV, EV, M> gatherUdf = new GatherUdf<K, VV, EV, M>(gather, innerType);
    +		SumUdf<K, VV, EV, M> sumUdf = new SumUdf<K, VV, EV, M>(sum, innerType);
    +		ApplyUdf<K, VV, EV, M> applyUdf = new ApplyUdf<K, VV, EV, M>(apply, outputType);
    +
    +		final int[] zeroKeyPos = new int[] {0};
    +		final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
    +				vertexDataSet.iterateDelta(vertexDataSet, maximumNumberOfIterations, zeroKeyPos);
    +
    +		// Prepare triplets
    +		DataSet<Tuple3<Vertex<K, VV>, Edge<K, EV>, Vertex<K, VV>>> triplets = iteration
    +				.getWorkset()
    +				.join(edgeDataSet)
    +					.where(0)
    +					.equalTo(0)
    +					.with(new PairJoinFunction<K, VV, EV>())
    +				.join(iteration.getSolutionSet())
    +					.where(0)
    +					.equalTo(0)
    +					.with(new TripletJoinFunction<K, VV, EV>());
    +
    +		// Gather, sum and apply
    +		DataSet<Tuple2<K, M>> gatheredSet = triplets.map(gatherUdf);
    +		DataSet<Tuple2<K, M>> summedSet = gatheredSet.groupBy(0).reduce(sumUdf);
    +		DataSet<Vertex<K, VV>> appliedSet = summedSet
    +				.join(iteration.getSolutionSet())
    +				.where(0)
    +				.equalTo(0)
    +				.with(applyUdf);
    +
    +		return iteration.closeWith(appliedSet, appliedSet);
    +	}
    +
    +	/**
    +	 * Creates a new gather-sum-apply iteration operator for graphs
    +	 *
    +	 * @param edges The edge DataSet
    +	 *
    +	 * @param gather The gather function of the GSA iteration
    +	 * @param sum The sum function of the GSA iteration
    +	 * @param apply The apply function of the GSA iteration
    +	 *
    +	 * @param maximumNumberOfIterations The maximum number of iterations executed
    +	 *
    +	 * @param <K> The type of the vertex key in the graph
    +	 * @param <VV> The type of the vertex value in the graph
    +	 * @param <EV> The type of the edge value in the graph
    +	 * @param <M> The intermediate type used by the gather, sum and apply functions
    +	 *
    +	 * @return An in stance of the gather-sum-apply graph computation operator.
    +	 */
    +	public static final <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, M>
    +			GatherSumApplyIteration<K, VV, EV, M> withEdges(DataSet<Edge<K, EV>> edges,
    +			GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum, ApplyFunction<VV, EV, M> apply,
    +			int maximumNumberOfIterations) {
    +		return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations);
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Triplet Utils
    +	// --------------------------------------------------------------------------------------------
    +
    +	private static final class PairJoinFunction<K extends Comparable<K> & Serializable, VV extends Serializable,
    +			EV extends Serializable> implements FlatJoinFunction<Vertex<K, VV>, Edge<K, EV>,
    +			Tuple3<K, Vertex<K, VV>, Edge<K, EV>>> {
    +
    +		@Override
    +		public void join(Vertex<K, VV> vertex, Edge<K, EV> edge,
    +				Collector<Tuple3<K, Vertex<K, VV>, Edge<K, EV>>> collector) throws Exception {
    +			collector.collect(new Tuple3<K, Vertex<K, VV>, Edge<K, EV>>(edge.getTarget(), vertex, edge));
    +		}
    +	}
    +
    +	private static final class TripletJoinFunction<K extends Comparable<K> & Serializable, VV extends Serializable,
    +			EV extends Serializable> implements  FlatJoinFunction<Tuple3<K, Vertex<K, VV>, Edge<K, EV>>, Vertex<K, VV>,
    +			Tuple3<Vertex<K, VV>, Edge<K, EV>, Vertex<K, VV>>> {
    +		@Override
    +		public void join(Tuple3<K, Vertex<K, VV>, Edge<K, EV>> vertexEdge, Vertex<K, VV> vertex,
    +				Collector<Tuple3<Vertex<K, VV>, Edge<K, EV>, Vertex<K, VV>>> out) throws Exception {
    +			out.collect(new Tuple3<Vertex<K, VV>, Edge<K, EV>, Vertex<K, VV>>(
    +					vertexEdge.f1, vertexEdge.f2, vertex
    +			));
    +		}
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Wrapping UDFs
    +	// --------------------------------------------------------------------------------------------
    +
    +	private static final class GatherUdf<K extends Comparable<K> & Serializable, VV extends Serializable,
    +			EV extends Serializable, M> extends RichMapFunction<Tuple3<Vertex<K, VV>, Edge<K, EV>, Vertex<K, VV>>,
    +			Tuple2<K, M>> implements ResultTypeQueryable<Tuple2<K, M>> {
    +
    +		private final GatherFunction<VV, EV, M> gatherFunction;
    +		private transient TypeInformation<Tuple2<K, M>> resultType;
    +
    +		private GatherUdf(GatherFunction<VV, EV, M> gatherFunction, TypeInformation<Tuple2<K, M>> resultType) {
    +			this.gatherFunction = gatherFunction;
    +			this.resultType = resultType;
    +		}
    +
    +		@Override
    +		public Tuple2<K, M> map(Tuple3<Vertex<K, VV>, Edge<K, EV>, Vertex<K, VV>> triplet) throws Exception {
    +			Triplet<VV, EV> userTriplet = new Triplet<VV, EV>(triplet.f0.getValue(),
    +					triplet.f1.getValue(), triplet.f2.getValue());
    +
    +			K key = triplet.f2.getId();
    +			M result = this.gatherFunction.gather(userTriplet);
    +			return new Tuple2<K, M>(key, result);
    +		}
    +
    +		@Override
    +		public void open(Configuration parameters) throws Exception {
    --- End diff --
    
    @vasia,
    
    Perhaps we should check if the superstep number bug in vertexCentricIteration is reproducible, because from the looks of this, it should be...  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by balidani <gi...@git.apache.org>.
Github user balidani commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-93977860
  
    Hi @andralungu!
    
    I just talked to Vasia, and I'll do a rebase today.
    Cheers!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-76083807
  
    Thanks for the revision @balidani! I added some minor inline comments.
    I'd like someone else to also review this before we merge.
    In the meantime, I'll try to find some time and test this on a cluster :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r25852832
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.example;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.example.utils.ExampleUtils;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.GatherSumApplyIteration;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.graph.gsa.Triplet;
    +
    +import java.util.HashSet;
    +
    +/**
    + * This is an implementation of the Greedy Graph Coloring algorithm, using a gather-sum-apply iteration
    + */
    +public class GSAGreedyGraphColoringExample implements ProgramDescription {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Program
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if (!parseParameters(args)) {
    +			return;
    +		}
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
    +		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
    +
    +		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
    +
    +		// Gather the target vertices into a one-element set
    +		GatherFunction<Double, Double, HashSet<Double>> gather = new GreedyGraphColoringGather();
    +
    +		// Merge the sets between neighbors
    +		SumFunction<Double, Double, HashSet<Double>> sum = new GreedyGraphColoringSum();
    +
    +		// Find the minimum vertex id in the set which will be propagated
    +		ApplyFunction<Double, Double, HashSet<Double>> apply = new GreedyGraphColoringApply();
    +
    +		// Execute the GSA iteration
    +		GatherSumApplyIteration<Long, Double, Double, HashSet<Double>> iteration =
    +				graph.createGatherSumApplyIteration(gather, sum, apply, maxIterations);
    +		Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration(iteration);
    +
    +		// Extract the vertices as the result
    +		DataSet<Vertex<Long, Double>> greedyGraphColoring = result.getVertices();
    +
    +		// emit result
    +		if (fileOutput) {
    +			greedyGraphColoring.writeAsCsv(outputPath, "\n", " ");
    +		} else {
    +			greedyGraphColoring.print();
    +		}
    +
    +		env.execute("GSA Greedy Graph Coloring");
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Greedy Graph Coloring UDFs
    +	// --------------------------------------------------------------------------------------------
    +
    +	private static final class GreedyGraphColoringGather
    +			extends GatherFunction<Double, Double, HashSet<Double>> {
    +		@Override
    +		public HashSet<Double> gather(Triplet<Double, Double> triplet) {
    +
    +			HashSet<Double> result = new HashSet<Double>();
    +			result.add(triplet.getSrcVertexValue());
    +
    +			return result;
    +		}
    +	};
    +
    +	private static final class GreedyGraphColoringSum
    +			extends SumFunction<Double, Double, HashSet<Double>> {
    +		@Override
    +		public HashSet<Double> sum(HashSet<Double> newValue, HashSet<Double> currentValue) {
    +
    +			HashSet<Double> result = new HashSet<Double>();
    +			result.addAll(newValue);
    +			result.addAll(currentValue);
    +
    +			return result;
    +		}
    +	};
    +
    +	private static final class GreedyGraphColoringApply
    +			extends ApplyFunction<Double, Double, HashSet<Double>> {
    +		@Override
    +		public void apply(HashSet<Double> set, Double src) {
    +			double minValue = src;
    +			for (Double d : set) {
    +				if (d < minValue) {
    +					minValue = d;
    +				}
    +			}
    +
    +			// This is the condition that enables the termination of the iteration
    +			if (minValue < src) {
    +				setResult(minValue);
    +			}
    +		}
    +	};
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Util methods
    +	// --------------------------------------------------------------------------------------------
    +
    +	private static boolean fileOutput = false;
    +	private static String vertexInputPath = null;
    +	private static String edgeInputPath = null;
    +	private static String outputPath = null;
    +
    +	private static int maxIterations = 16;
    +
    +	private static boolean parseParameters(String[] args) {
    +
    +		if(args.length > 0) {
    +			// parse input arguments
    +			fileOutput = true;
    +
    +			if(args.length != 4) {
    +				System.err.println("Usage: GSAGreedyGraphColoringExample <vertex path> <edge path> " +
    +						"<result path> <max iterations>");
    +				return false;
    +			}
    +
    +			vertexInputPath = args[0];
    +			edgeInputPath = args[1];
    +			outputPath = args[2];
    +			maxIterations = Integer.parseInt(args[3]);
    +		} else {
    +			System.out.println("Executing GSA Greedy Graph Coloring example with built-in default data.");
    +			System.out.println("  Provide parameters to read input data from files.");
    +			System.out.println("  See the documentation for the correct format of input files.");
    +			System.out.println("  Usage: GSAGreedyGraphColoringExample <vertex path> <edge path> "
    +					+ "<result path> <max iterations>");
    +		}
    +		return true;
    +	}
    +
    +	private static DataSet<Vertex<Long, Double>> getVertexDataSet(ExecutionEnvironment env) {
    +		if(fileOutput) {
    +			return env
    +					.readCsvFile(vertexInputPath)
    +					.fieldDelimiter(" ")
    +					.lineDelimiter("\n")
    +					.types(Long.class, Double.class)
    +					.map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() {
    +						@Override
    +						public Vertex<Long, Double> map(Tuple2<Long, Double> value) throws Exception {
    +							return new Vertex<Long, Double>(value.f0, value.f1);
    +						}
    +					});
    +		} else {
    +			return ExampleUtils.getLongDoubleVertexData(env);
    --- End diff --
    
    This is just a cosmetic bug, but I think it could be useful to mention it... 
    Could you, instead of ExampleUtils, take the data from a separate class? For SSSP for example, you have SingleSourceShortestPathsData in example/utils. This would be more legible(i.e. it would look nicer than getLongLongVertexData :) ).
    Also @vasia , I see nobody else is using that method in ExampleUtils so maybe after this we can get rid of it :D


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r28645450
  
    --- Diff: flink-staging/flink-gelly/pom.xml ---
    @@ -57,4 +57,35 @@ under the License.
     			<version>${guava.version}</version>
     		</dependency>
     	</dependencies>
    +
    --- End diff --
    
    Hey @balidani. I think there's no need for these profiles anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-77774317
  
    Ona more thought: The triplet re-building in every iteration seems quite expensive. Does it make sense to have the triplets (or quadruplets, with the vertex value) as the data in the solution set?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r26074204
  
    --- Diff: flink-staging/flink-gelly/pom.xml ---
    @@ -52,4 +52,35 @@ under the License.
     			<scope>test</scope>
     		</dependency>
     	</dependencies>
    +
    +    <!-- See main pom.xml for explanation of profiles -->
    +    <profiles>
    +        <profile>
    +            <id>hadoop-1</id>
    +            <activation>
    +                <property>
    +                    <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
    +                    <!--hadoop1--><name>hadoop.profile</name><value>1</value>
    +                </property>
    +            </activation>
    +            <dependencies>
    +                <!-- Add this here, for hadoop-2 we don't need it since we get guava transitively -->
    +                <dependency>
    +                    <groupId>com.google.guava</groupId>
    +                    <artifactId>guava</artifactId>
    +                    <version>${guava.version}</version>
    +                    <scope>provided</scope>
    +                </dependency>
    +            </dependencies>
    +        </profile>
    +        <profile>
    --- End diff --
    
    Perfect, thanks @rmetzger!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-77749211
  
    This adds a Guava dependency and does some Hadoop profile magic. We have to be very careful when merging this to be consistent with changes in #454 by @rmetzger 
    
    I think that #454 should significantly simplify the Hadoop profile and Guava business.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by balidani <gi...@git.apache.org>.
Github user balidani commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-94306580
  
    Hi @vasia!
    
    I fixed the problems, sorry about that. I had to resolve some merge conflicts and it appears that I did not notice some things. Next time I'll check the diff before I commit :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r26008874
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.gsa;
    +
    +import org.apache.flink.api.common.functions.IterationRuntimeContext;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.Serializable;
    +
    +public abstract class ApplyFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
    +
    +	public abstract void apply(M message, VV vertexValue);
    --- End diff --
    
    this is so that we can take advantage of the workset and be able to emit only changed values here, when the update function is incremental.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r25255528
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.example;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.example.utils.ExampleUtils;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.GatherSumApplyIteration;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
    + */
    +public class GSASingleSourceShortestPathsExample implements ProgramDescription, GraphAlgorithm<Long, Double, Double> {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Program
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if(!parseParameters(args)) {
    +			return;
    +		}
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
    +		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
    +
    +		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
    +
    +		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph
    +				.run(new GSASingleSourceShortestPathsExample()).getVertices();
    +
    +		// emit result
    +		if(fileOutput) {
    +			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
    +		} else {
    +			singleSourceShortestPaths.print();
    +		}
    +
    +		env.execute("GSA Single Source Shortest Paths Example");
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  GraphAlgorithm Functions
    +	// --------------------------------------------------------------------------------------------
    +
    +	@Override
    +	public Graph<Long, Double, Double> run(Graph<Long, Double, Double> input) {
    +		// The path from src to trg through edge e costs src + e
    +		// If the target's distance is 0 (the target is the actual source), return 0 instead
    +		GatherFunction<Long, Double, Double, Double> gather = new SingleSourceShortestPathGather();
    +
    +		// Return the smaller path length to minimize distance
    +		SumFunction<Long, Double, Double, Double> sum = new SingleSourceShortestPathSum();
    +
    +		// Iterate as long as the distance is updated
    +		ApplyFunction<Long, Double, Double, Double> apply = new SingleSourceShortestPathApply();
    +
    +		GatherSumApplyIteration<Long, Double, Double, Double> iteration = input.createGatherSumApplyIteration(
    +				gather, sum, apply, this.maxIterations);
    +		return input.mapVertices(new InitVerticesMapper<Long>(this.srcVertexId))
    +				.runGatherSumApplyIteration(iteration);
    +	}
    +
    +	public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
    +			implements MapFunction<Vertex<K, Double>, Double> {
    +
    +		private K srcVertexId;
    +
    +		public InitVerticesMapper(K srcId) {
    +			this.srcVertexId = srcId;
    +		}
    +
    +		public Double map(Vertex<K, Double> value) {
    +			if (value.f0.equals(srcVertexId)) {
    +				return 0.0;
    +			} else {
    +				return Double.POSITIVE_INFINITY;
    +			}
    +		}
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Single Source Shortest Path UDFs
    +	// --------------------------------------------------------------------------------------------
    +
    +	private static final class SingleSourceShortestPathGather
    +			extends GatherFunction<Long, Double, Double, Double> {
    +		@Override
    +		public Tuple2<Long, Double> gather(Tuple3<Vertex<Long, Double>,
    +				Edge<Long, Double>, Vertex<Long, Double>> triplet) {
    +
    +			if (triplet.f2.getValue() == 0.0) {
    +				return new Tuple2<Long, Double>(triplet.f2.getId(), 0.0);
    +			}
    --- End diff --
    
    The iteration should terminate when there are no more active vertices, i.e. when all have converged to a value. If you only emit the smaller values in Apply (see my next comment), this should work fine :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r26005198
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.gsa;
    +
    +import org.apache.flink.api.common.functions.IterationRuntimeContext;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.Serializable;
    +
    +public abstract class ApplyFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
    +
    +	public abstract void apply(M message, VV vertexValue);
    --- End diff --
    
    Could this be simplified to `public VV apply(M message, VV vertexValue)`?
    
    The `setResult(...)` method could be dropped then, might simplify many programs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r25255045
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.example;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.example.utils.ExampleUtils;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.GatherSumApplyIteration;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
    + */
    +public class GSASingleSourceShortestPathsExample implements ProgramDescription, GraphAlgorithm<Long, Double, Double> {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Program
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if(!parseParameters(args)) {
    +			return;
    +		}
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
    +		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
    +
    +		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
    +
    +		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph
    +				.run(new GSASingleSourceShortestPathsExample()).getVertices();
    +
    +		// emit result
    +		if(fileOutput) {
    +			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
    +		} else {
    +			singleSourceShortestPaths.print();
    +		}
    +
    +		env.execute("GSA Single Source Shortest Paths Example");
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  GraphAlgorithm Functions
    +	// --------------------------------------------------------------------------------------------
    +
    +	@Override
    +	public Graph<Long, Double, Double> run(Graph<Long, Double, Double> input) {
    +		// The path from src to trg through edge e costs src + e
    +		// If the target's distance is 0 (the target is the actual source), return 0 instead
    +		GatherFunction<Long, Double, Double, Double> gather = new SingleSourceShortestPathGather();
    +
    +		// Return the smaller path length to minimize distance
    +		SumFunction<Long, Double, Double, Double> sum = new SingleSourceShortestPathSum();
    +
    +		// Iterate as long as the distance is updated
    +		ApplyFunction<Long, Double, Double, Double> apply = new SingleSourceShortestPathApply();
    +
    +		GatherSumApplyIteration<Long, Double, Double, Double> iteration = input.createGatherSumApplyIteration(
    +				gather, sum, apply, this.maxIterations);
    +		return input.mapVertices(new InitVerticesMapper<Long>(this.srcVertexId))
    +				.runGatherSumApplyIteration(iteration);
    +	}
    +
    +	public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
    +			implements MapFunction<Vertex<K, Double>, Double> {
    +
    +		private K srcVertexId;
    +
    +		public InitVerticesMapper(K srcId) {
    +			this.srcVertexId = srcId;
    +		}
    +
    +		public Double map(Vertex<K, Double> value) {
    +			if (value.f0.equals(srcVertexId)) {
    +				return 0.0;
    +			} else {
    +				return Double.POSITIVE_INFINITY;
    +			}
    +		}
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Single Source Shortest Path UDFs
    +	// --------------------------------------------------------------------------------------------
    +
    +	private static final class SingleSourceShortestPathGather
    +			extends GatherFunction<Long, Double, Double, Double> {
    +		@Override
    +		public Tuple2<Long, Double> gather(Tuple3<Vertex<Long, Double>,
    +				Edge<Long, Double>, Vertex<Long, Double>> triplet) {
    +
    +			if (triplet.f2.getValue() == 0.0) {
    +				return new Tuple2<Long, Double>(triplet.f2.getId(), 0.0);
    +			}
    +
    +			return new Tuple2<Long, Double>(triplet.f2.getId(),
    +					triplet.f0.getValue() + (double) triplet.f1.getValue());
    +		}
    +	};
    +
    +	private static final class SingleSourceShortestPathSum
    +			extends SumFunction<Long, Double, Double, Double> {
    +		@Override
    +		public Tuple2<Long, Double> sum(Tuple2<Long, Double> arg0, Tuple2<Long, Double> arg1) {
    +			return new Tuple2<Long, Double>(arg0.f0, Math.min(arg0.f1, arg1.f1));
    +		}
    +	};
    +
    +	private static final class SingleSourceShortestPathApply
    +			extends ApplyFunction<Long, Double, Double, Double> {
    +		@Override
    +		public void apply(Tuple2<Long, Double> summed,
    +				Vertex<Long, Double> trg, Collector<Vertex<Long, Double>> collector) {
    +
    +			if (summed.f1 != trg.getValue()) {
    --- End diff --
    
    I think you only need to emit the new value if it's smaller than the current


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by balidani <gi...@git.apache.org>.
Github user balidani commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r25391476
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.example;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.example.utils.ExampleUtils;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.GatherSumApplyIteration;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.graph.gsa.Triplet;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
    + */
    +public class GSASingleSourceShortestPathsExample implements ProgramDescription {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Program
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if(!parseParameters(args)) {
    +			return;
    +		}
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
    +		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
    +
    +		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
    +
    +		// The path from src to trg through edge e costs src + e
    +		// If the target's distance is 0 (the target is the actual source), return 0 instead
    +		GatherFunction<Double, Double, Double> gather = new SingleSourceShortestPathGather();
    +
    +		// Return the smaller path length to minimize distance
    +		SumFunction<Double, Double, Double> sum = new SingleSourceShortestPathSum();
    +
    +		// Iterate as long as the distance is updated
    +		ApplyFunction<Double, Double, Double> apply = new SingleSourceShortestPathApply();
    +
    +		// Execute the GSA iteration
    +		GatherSumApplyIteration<Long, Double, Double, Double> iteration = graph.createGatherSumApplyIteration(
    +				gather, sum, apply, maxIterations);
    +		Graph<Long, Double, Double> result = graph.mapVertices(new InitVerticesMapper<Long>(srcVertexId))
    +				.runGatherSumApplyIteration(iteration);
    +
    +		// Extract the vertices as the result
    +		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
    +
    +		// emit result
    +		if(fileOutput) {
    +			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
    +		} else {
    +			singleSourceShortestPaths.print();
    +		}
    +
    +		env.execute("GSA Single Source Shortest Paths Example");
    +	}
    +
    +	public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
    +			implements MapFunction<Vertex<K, Double>, Double> {
    +
    +		private K srcVertexId;
    +
    +		public InitVerticesMapper(K srcId) {
    +			this.srcVertexId = srcId;
    +		}
    +
    +		public Double map(Vertex<K, Double> value) {
    +			if (value.f0.equals(srcVertexId)) {
    +				return 0.0;
    +			} else {
    +				return Double.POSITIVE_INFINITY;
    +			}
    +		}
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Single Source Shortest Path UDFs
    +	// --------------------------------------------------------------------------------------------
    +
    +	private static final class SingleSourceShortestPathGather
    +			extends GatherFunction<Double, Double, Double> {
    +		@Override
    +		public Double gather(Triplet<Double, Double> triplet) {
    +			return triplet.getSource() + triplet.getEdge();
    +		}
    +	};
    +
    +	private static final class SingleSourceShortestPathSum
    +			extends SumFunction<Double, Double, Double> {
    +		@Override
    +		public Double sum(Double arg0, Double arg1) {
    +			return Math.min(arg0, arg1);
    +		}
    +	};
    +
    +	private static final class SingleSourceShortestPathApply
    +			extends ApplyFunction<Double, Double, Double> {
    +		@Override
    +		public void apply(Double summed, Double target) {
    --- End diff --
    
    Should this be done in `GatherSumApplyIteration` and the GGC example as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-95866092
  
    I noticed that there is no GSAPageRank example in the latest version of the code. 
    I'd like to take care of that once this PR gets merged, to get to understand this model on a practical level, and @vasia can write the documentation. If no objections, I will open a JIRA for this after I see the code in "production" :D


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r25389899
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Triplet.java ---
    @@ -0,0 +1,45 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.gsa;
    +
    +import org.apache.flink.api.java.tuple.Tuple3;
    +
    +import java.io.Serializable;
    +
    --- End diff --
    
    can you add a short description for this class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r25390113
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.example;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.example.utils.ExampleUtils;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.GatherSumApplyIteration;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.graph.gsa.Triplet;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
    + */
    +public class GSASingleSourceShortestPathsExample implements ProgramDescription {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Program
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if(!parseParameters(args)) {
    +			return;
    +		}
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
    +		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
    +
    +		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
    +
    +		// The path from src to trg through edge e costs src + e
    +		// If the target's distance is 0 (the target is the actual source), return 0 instead
    +		GatherFunction<Double, Double, Double> gather = new SingleSourceShortestPathGather();
    +
    +		// Return the smaller path length to minimize distance
    +		SumFunction<Double, Double, Double> sum = new SingleSourceShortestPathSum();
    +
    +		// Iterate as long as the distance is updated
    +		ApplyFunction<Double, Double, Double> apply = new SingleSourceShortestPathApply();
    +
    +		// Execute the GSA iteration
    +		GatherSumApplyIteration<Long, Double, Double, Double> iteration = graph.createGatherSumApplyIteration(
    +				gather, sum, apply, maxIterations);
    +		Graph<Long, Double, Double> result = graph.mapVertices(new InitVerticesMapper<Long>(srcVertexId))
    +				.runGatherSumApplyIteration(iteration);
    +
    +		// Extract the vertices as the result
    +		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
    +
    +		// emit result
    +		if(fileOutput) {
    +			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
    +		} else {
    +			singleSourceShortestPaths.print();
    +		}
    +
    +		env.execute("GSA Single Source Shortest Paths Example");
    +	}
    +
    +	public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
    +			implements MapFunction<Vertex<K, Double>, Double> {
    +
    +		private K srcVertexId;
    +
    +		public InitVerticesMapper(K srcId) {
    +			this.srcVertexId = srcId;
    +		}
    +
    +		public Double map(Vertex<K, Double> value) {
    +			if (value.f0.equals(srcVertexId)) {
    +				return 0.0;
    +			} else {
    +				return Double.POSITIVE_INFINITY;
    +			}
    +		}
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Single Source Shortest Path UDFs
    +	// --------------------------------------------------------------------------------------------
    +
    +	private static final class SingleSourceShortestPathGather
    +			extends GatherFunction<Double, Double, Double> {
    +		@Override
    +		public Double gather(Triplet<Double, Double> triplet) {
    +			return triplet.getSource() + triplet.getEdge();
    +		}
    +	};
    +
    +	private static final class SingleSourceShortestPathSum
    +			extends SumFunction<Double, Double, Double> {
    +		@Override
    +		public Double sum(Double arg0, Double arg1) {
    +			return Math.min(arg0, arg1);
    +		}
    +	};
    +
    +	private static final class SingleSourceShortestPathApply
    +			extends ApplyFunction<Double, Double, Double> {
    +		@Override
    +		public void apply(Double summed, Double target) {
    --- End diff --
    
    I'd better call the args newValue and currentValue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-77774282
  
    The vertex-centric iterations use semantic annotations a lot to save sorting/partitioning where possible. This results in a plan for the vertex-centric iterations that partitions one and sorts one, despite the fact that it uses wither two coGroups or 2 joins plus a reduce. I think this would help big time here as well. In general, plan optimization may be something that Gelly could take a look at.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-95868450
  
    Hey Andra,
    
    there are several issues we'll need to fix/add after this is merged. A Pagerank example is one of them :) I have gathered and will open all related JIRAs once this is merged. Thanks for volunteering to take care of one already :))


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r26008662
  
    --- Diff: flink-staging/flink-gelly/pom.xml ---
    @@ -52,4 +52,35 @@ under the License.
     			<scope>test</scope>
     		</dependency>
     	</dependencies>
    +
    +    <!-- See main pom.xml for explanation of profiles -->
    +    <profiles>
    +        <profile>
    +            <id>hadoop-1</id>
    +            <activation>
    +                <property>
    +                    <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
    +                    <!--hadoop1--><name>hadoop.profile</name><value>1</value>
    +                </property>
    +            </activation>
    +            <dependencies>
    +                <!-- Add this here, for hadoop-2 we don't need it since we get guava transitively -->
    +                <dependency>
    +                    <groupId>com.google.guava</groupId>
    +                    <artifactId>guava</artifactId>
    +                    <version>${guava.version}</version>
    +                    <scope>provided</scope>
    +                </dependency>
    +            </dependencies>
    +        </profile>
    +        <profile>
    --- End diff --
    
    Regarding the pom, we've actually merged this change as part of #429. We added the guava dependency after [this discussion](https://mail-archives.apache.org/mod_mbox/flink-dev/201502.mbox/%3CCAJAsdNijOSZwqnn1ugw0qwJvXurmjb%3DOz64-sd3u97gZcbcW0Q%40mail.gmail.com%3E) in the dev mailing list. If there's a better way to deal with this, please let me know! The hadoop2 profile seems to be there by mistake, I'll fix that :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r28645615
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java ---
    @@ -0,0 +1,47 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.gsa;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This class represents a <sourceVertex, edge> pair
    + * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
    + * @param <VV> the vertex value type
    + * @param <EV> the edge value type
    + */
    +public class RichEdge<VV extends Serializable, EV extends Serializable>
    --- End diff --
    
    I don't really like the name `RichEdge`, but I don't have a better suggestion.. It's essentially a pair of the neighboring edge and vertex value, right? Should we simply call it `Neighbor` or `NeighborData` or `NeighborValue`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by balidani <gi...@git.apache.org>.
Github user balidani commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-75546102
  
    Hi @vasia!
    
    Thanks for the review! I started making the changed, but I got stuck. I get an exception about `GatherUdf` not being Serializable. I tried copying from `VertexCentricIteration` and I'm not sure what's the difference that causes the exception.
    
    Here is the full stack trace: https://gist.github.com/balidani/e6f716d3562d2aa131fb
    Here is `GatherSumApplyIteration.java`: https://gist.github.com/balidani/1139eddcb1b4cbf60404
    
    The GatherUdf is at the end. I didn't bother fully converting the Sum and Apply UDFs, because I already get an exception for gather.
    
    What exactly is `ResultTypeQueryable`? Does it have anything to do with Serializability? Should we implement this with all UDFs, just like in `VertexCentricIteration`?
    
    Cheers!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r25254768
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.example;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.example.utils.ExampleUtils;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.GatherSumApplyIteration;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
    + */
    +public class GSASingleSourceShortestPathsExample implements ProgramDescription, GraphAlgorithm<Long, Double, Double> {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Program
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if(!parseParameters(args)) {
    +			return;
    +		}
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
    +		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
    +
    +		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
    +
    +		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph
    +				.run(new GSASingleSourceShortestPathsExample()).getVertices();
    +
    +		// emit result
    +		if(fileOutput) {
    +			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
    +		} else {
    +			singleSourceShortestPaths.print();
    +		}
    +
    +		env.execute("GSA Single Source Shortest Paths Example");
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  GraphAlgorithm Functions
    +	// --------------------------------------------------------------------------------------------
    +
    +	@Override
    +	public Graph<Long, Double, Double> run(Graph<Long, Double, Double> input) {
    +		// The path from src to trg through edge e costs src + e
    +		// If the target's distance is 0 (the target is the actual source), return 0 instead
    +		GatherFunction<Long, Double, Double, Double> gather = new SingleSourceShortestPathGather();
    +
    +		// Return the smaller path length to minimize distance
    +		SumFunction<Long, Double, Double, Double> sum = new SingleSourceShortestPathSum();
    +
    +		// Iterate as long as the distance is updated
    +		ApplyFunction<Long, Double, Double, Double> apply = new SingleSourceShortestPathApply();
    +
    +		GatherSumApplyIteration<Long, Double, Double, Double> iteration = input.createGatherSumApplyIteration(
    +				gather, sum, apply, this.maxIterations);
    +		return input.mapVertices(new InitVerticesMapper<Long>(this.srcVertexId))
    +				.runGatherSumApplyIteration(iteration);
    +	}
    +
    +	public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
    +			implements MapFunction<Vertex<K, Double>, Double> {
    +
    +		private K srcVertexId;
    +
    +		public InitVerticesMapper(K srcId) {
    +			this.srcVertexId = srcId;
    +		}
    +
    +		public Double map(Vertex<K, Double> value) {
    +			if (value.f0.equals(srcVertexId)) {
    +				return 0.0;
    +			} else {
    +				return Double.POSITIVE_INFINITY;
    +			}
    +		}
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Single Source Shortest Path UDFs
    +	// --------------------------------------------------------------------------------------------
    +
    +	private static final class SingleSourceShortestPathGather
    +			extends GatherFunction<Long, Double, Double, Double> {
    +		@Override
    +		public Tuple2<Long, Double> gather(Tuple3<Vertex<Long, Double>,
    +				Edge<Long, Double>, Vertex<Long, Double>> triplet) {
    --- End diff --
    
    How about we make a custom `Triplet` type and add `getSrcVertex()`, `getEdge()` and `getTrgVertex()` methods, so that we don't need to use the tuple fields here? I think it makes code a bit hard to understand..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-94183509
  
    Thanks @balidani for the update! I left some inline comments which should be fairly easy to address.
    I would like to merge this soon and then we should open separate issues for documentation and configuration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-77774172
  
    Regarding the rest of the comments:
    - shall we change the SumFunction, ApplyFunction, etc. to have 2 versions, i.e. single-abstract-interface and rich? If yes, it should be a good idea to go the same for vertex-centric.
    - the run/create slit is indeed for configuration, aggregators and broadcast sets. We did the same for vertex-centric in #402. I do like your api proposal more though, I guess we should change both :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by balidani <gi...@git.apache.org>.
Github user balidani commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-82443079
  
    Hi!
    
    I pushed some changes. @vasia told me that she and @StephanEwen even decided that iterating on triplets is unfeasible, so we should operate on `<srcVertex, edge>` pairs instead. Only a few changes were required, since neither of the examples require the target vertex value anyway.
    
    Regarding the other suggestions, @vasia and I think it would be a good idea to push them in a separate PR, because they affect both GSA and Vertex centric iterations.
    
    Regarding the cosmetic changes that @andralungu suggested: I changed SSSP to use `SingleSourceShortestPathData`, and for greedy graph coloring I just generated a graph, like in `LabelPropagationExample`.
    
    Cheers!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-77749391
  
    In the core API, we have started to define all functions (map, join, ... - here it would be SumFunction, ApplyFunction) as single-abstract-method interfaces. That allows to use Java 8 lambas (Scala may pick this up as well). For each function, there is a `RichFunction` variant which has the `open()` and `close()` methods, and the `getRuntimeContext()` (here the `preSuperstep()` and `postSuperstep()` methods).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by balidani <gi...@git.apache.org>.
Github user balidani commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r25255193
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.example;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.example.utils.ExampleUtils;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.GatherSumApplyIteration;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
    + */
    +public class GSASingleSourceShortestPathsExample implements ProgramDescription, GraphAlgorithm<Long, Double, Double> {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Program
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if(!parseParameters(args)) {
    +			return;
    +		}
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
    +		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
    +
    +		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
    +
    +		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph
    +				.run(new GSASingleSourceShortestPathsExample()).getVertices();
    +
    +		// emit result
    +		if(fileOutput) {
    +			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
    +		} else {
    +			singleSourceShortestPaths.print();
    +		}
    +
    +		env.execute("GSA Single Source Shortest Paths Example");
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  GraphAlgorithm Functions
    +	// --------------------------------------------------------------------------------------------
    +
    +	@Override
    +	public Graph<Long, Double, Double> run(Graph<Long, Double, Double> input) {
    +		// The path from src to trg through edge e costs src + e
    +		// If the target's distance is 0 (the target is the actual source), return 0 instead
    +		GatherFunction<Long, Double, Double, Double> gather = new SingleSourceShortestPathGather();
    +
    +		// Return the smaller path length to minimize distance
    +		SumFunction<Long, Double, Double, Double> sum = new SingleSourceShortestPathSum();
    +
    +		// Iterate as long as the distance is updated
    +		ApplyFunction<Long, Double, Double, Double> apply = new SingleSourceShortestPathApply();
    +
    +		GatherSumApplyIteration<Long, Double, Double, Double> iteration = input.createGatherSumApplyIteration(
    +				gather, sum, apply, this.maxIterations);
    +		return input.mapVertices(new InitVerticesMapper<Long>(this.srcVertexId))
    +				.runGatherSumApplyIteration(iteration);
    +	}
    +
    +	public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
    +			implements MapFunction<Vertex<K, Double>, Double> {
    +
    +		private K srcVertexId;
    +
    +		public InitVerticesMapper(K srcId) {
    +			this.srcVertexId = srcId;
    +		}
    +
    +		public Double map(Vertex<K, Double> value) {
    +			if (value.f0.equals(srcVertexId)) {
    +				return 0.0;
    +			} else {
    +				return Double.POSITIVE_INFINITY;
    +			}
    +		}
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Single Source Shortest Path UDFs
    +	// --------------------------------------------------------------------------------------------
    +
    +	private static final class SingleSourceShortestPathGather
    +			extends GatherFunction<Long, Double, Double, Double> {
    +		@Override
    +		public Tuple2<Long, Double> gather(Tuple3<Vertex<Long, Double>,
    +				Edge<Long, Double>, Vertex<Long, Double>> triplet) {
    --- End diff --
    
    Agreed! Where should the `Triplet` class be implemented?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r28645462
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
    @@ -79,7 +83,8 @@
     	private final DataSet<Edge<K, EV>> edges;
     
     	/**
    -	 * Creates a graph from two DataSets: vertices and edges
    +	 * Creates a graph from two DataSets: vertices and edges and allow setting
    +	 * the undirected property
    --- End diff --
    
    no undirected property here :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r25254212
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.example;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.example.utils.ExampleUtils;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.GatherSumApplyIteration;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
    + */
    +public class GSASingleSourceShortestPathsExample implements ProgramDescription, GraphAlgorithm<Long, Double, Double> {
    --- End diff --
    
    Since it's not a library method, there's no need to implement GraphAlgorithm or have a run() method. You can move the body to main().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by balidani <gi...@git.apache.org>.
Github user balidani commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-75755326
  
    Hi @vasia, all!
    
    So I implemented the changes that @vasia suggested. I also added a commit that adds the Hadoop profiles to the `flink-gelly` project in `pom.xml`, so tests using files won't fail.
    
    Are there any more things I should change?
    
    Cheers!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by andralungu <gi...@git.apache.org>.
Github user andralungu commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-77341085
  
    Not sure why the last Travis check failed because the pom.xml seems to be the correct one.
    
    But apart from that, this looks very nice and compliant with the vertex centric approach. 
    However, there is a bug there, in runVertexCentricIteration: the superstep number does not change when you run tests in Collection mode and I have a hunch that it would also be reproducible here. 
    
    Apart from that. Good to merge from my side. 
    
    And as future work, once I finish the vertexCentricIteration extensions which will allow the user to choose the messaging direction, we should exend GSA to do the same :D  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-75557616
  
    I had an offline discussion with @balidani and we figured that the exception was actually coming from the test, not the GatherSumApplyIteration. So, no issue here.
    
    Regarding the `ResultTypeQueryable` interface and types, I will point you to [this mailing list thread] (https://mail-archives.apache.org/mod_mbox/flink-dev/201501.mbox/%3CCANC1h_vyjXgL=0Ddgb6gihe3JmmZ+NixN3Sdgji+ZgCiZ61dNQ@mail.gmail.com%3E) and [this doc] (https://github.com/apache/flink/blob/master/docs/internal_types_serialization.md). I hope these clear things up a bit :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by balidani <gi...@git.apache.org>.
Github user balidani commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r24763748
  
    --- Diff: flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.test;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.test.util.MultipleProgramsTestBase;
    +import org.apache.flink.util.Collector;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +
    +@RunWith(Parameterized.class)
    +public class GatherSumApplyITCase extends MultipleProgramsTestBase {
    +
    +	public GatherSumApplyITCase(MultipleProgramsTestBase.ExecutionMode mode){
    +		super(mode);
    +	}
    +
    +	private String resultPath;
    +	private String expectedResult;
    +
    +	@Rule
    +	public TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +	@Before
    +	public void before() throws Exception{
    +		resultPath = tempFolder.newFile().toURI().toString();
    +	}
    +
    +	@After
    +	public void after() throws Exception{
    +		compareResultsByLinesInMemory(expectedResult, resultPath);
    +	}
    +
    +	@Test
    +	public void testGreedyGraphColoring() throws Exception {
    +		/*
    +		 * This test runs the Greedy Graph Coloring algorithm using a GSA iteration
    +		 * The expected result is that the lowest vertex value (1) gets propagated to all vertices
    +		 */
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
    +				TestGraphUtils.getLongLongEdgeData(env), env);
    +
    +		// Gather the target vertices into a one-element set
    +		GatherFunction<Long, Long, Long, HashSet<Vertex<Long,Long>>> gather =
    +				new GatherFunction<Long, Long, Long, HashSet<Vertex<Long,Long>>>() {
    +
    +					@Override
    +					public Tuple2<Long, HashSet<Vertex<Long, Long>>> gather(Tuple3<Vertex<Long, Long>,
    +							Edge<Long, Long>, Vertex<Long, Long>> triplet) {
    +
    +						HashSet<Vertex<Long, Long>> result = new HashSet<Vertex<Long, Long>>();
    +						result.add(triplet.f2);
    +
    +						return new Tuple2<Long, HashSet<Vertex<Long, Long>>>(triplet.f0.getId(), result);
    +					}
    +				};
    +
    +		// Merge the sets between neighbors
    +		SumFunction<Long, Long, Long, HashSet<Vertex<Long, Long>>> sum =
    +				new SumFunction<Long, Long, Long, HashSet<Vertex<Long, Long>>>() {
    +
    +					@Override
    +					public Tuple2<Long, HashSet<Vertex<Long, Long>>> sum(
    +							Tuple2<Long, HashSet<Vertex<Long, Long>>> arg0,
    +							Tuple2<Long, HashSet<Vertex<Long, Long>>> arg1) {
    +
    +						HashSet<Vertex<Long, Long>> result = new HashSet<Vertex<Long, Long>>();
    +						result.addAll(arg0.f1);
    +						result.addAll(arg1.f1);
    +
    +						return new Tuple2<Long, HashSet<Vertex<Long, Long>>>(arg0.f0, result);
    +					}
    +				};
    +
    +		// Find the minimum vertex id in the set which will be propagated
    +		ApplyFunction<Long, Long, Long, HashSet<Vertex<Long, Long>>> apply =
    +				new ApplyFunction<Long, Long, Long, HashSet<Vertex<Long, Long>>>() {
    +
    +					@Override
    +					public void apply(Tuple2<Long, HashSet<Vertex<Long, Long>>> set,
    +							Vertex<Long, Long> src, Collector<Vertex<Long, Long>> collector) {
    +						long minValue = src.getValue();
    +						for (Vertex<Long, Long> v : set.f1) {
    +							if (v.getValue() < minValue) {
    +								minValue = v.getValue();
    +							}
    +						}
    +
    +						// This is the condition that enables the termination of the iteration
    +						if (minValue != src.getValue()) {
    +							collector.collect(new Vertex<Long, Long>(src.getId(), minValue));
    +						}
    +					}
    +				};
    +
    +		Graph<Long, Long, Long> minColoring = graph.runGatherSumApplyIteration(gather, sum, apply, 16);
    +		minColoring.getVertices().writeAsCsv(resultPath);
    +
    +		env.execute();
    +
    +		expectedResult = "1,1\n" +
    +				"2,1\n" +
    +				"3,1\n" +
    +				"4,1\n" +
    +				"5,1\n";
    +	}
    +
    +	@Test
    +	public void testSingleSourceShortestPath() throws Exception {
    +		/*
    +		 * This test runs the Single Source Shortest Path algorithm using a GSA iteration
    +		 */
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		// Start off with 0 for the source vertex and INF for all other vertices
    +		List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>();
    +		vertices.add(new Vertex<Long, Double>(1L, 0.0));
    +		vertices.add(new Vertex<Long, Double>(2L, Double.POSITIVE_INFINITY));
    +		vertices.add(new Vertex<Long, Double>(3L, Double.POSITIVE_INFINITY));
    +		vertices.add(new Vertex<Long, Double>(4L, Double.POSITIVE_INFINITY));
    +		vertices.add(new Vertex<Long, Double>(5L, Double.POSITIVE_INFINITY));
    +
    +		Graph<Long, Double, Long> graph = Graph.fromDataSet(env.fromCollection(vertices),
    +				TestGraphUtils.getLongLongEdgeData(env), env);
    +
    +		// The path from src to trg through edge e costs src + e
    +		// If the target's distance is 0 (the target is the actual source), return 0 instead
    +		GatherFunction<Long, Double, Long, Double> gather =
    +				new GatherFunction<Long, Double, Long, Double>() {
    +					@Override
    +					public Tuple2<Long, Double> gather(Tuple3<Vertex<Long, Double>,
    +							Edge<Long, Long>, Vertex<Long, Double>> triplet) {
    +
    +						if (triplet.f2.getValue() == 0.0) {
    --- End diff --
    
    Normally this is not okay, but we never change the initial 0.0 value.
    Should we still make an epsilon check here, even though it would result in (a bit) less readable code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r28645507
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.example;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.graph.gsa.RichEdge;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +/**
    + * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration
    + */
    +public class GSAConnectedComponentsExample implements ProgramDescription {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Program
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if (!parseParameters(args)) {
    +			return;
    +		}
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
    +		DataSet<Vertex<Long, Long>> vertices = edges.flatMap(new InitVerticesMapper()).distinct();
    +
    +		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
    +
    +		// Simply return the vertex value of each vertex
    +		GatherFunction<Long, NullValue, Long> gather = new ConnectedComponentsGather();
    +
    +		// Select the lower value among neighbors
    +		SumFunction<Long, NullValue, Long> sum = new ConnectedComponentsSum();
    +
    +		// Set the lower value for each vertex
    +		ApplyFunction<Long, NullValue, Long> apply = new ConnectedComponentsApply();
    +
    +		// Execute the GSA iteration
    +		Graph<Long, Long, NullValue> result =
    +				graph.runGatherSumApplyIteration(gather, sum, apply, maxIterations);
    +
    +		// Extract the vertices as the result
    +		DataSet<Vertex<Long, Long>> greedyGraphColoring = result.getVertices();
    --- End diff --
    
    greedyGraphColoring?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r25391723
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.example;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.example.utils.ExampleUtils;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.GatherSumApplyIteration;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.graph.gsa.Triplet;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
    + */
    +public class GSASingleSourceShortestPathsExample implements ProgramDescription {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Program
    +	// --------------------------------------------------------------------------------------------
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		if(!parseParameters(args)) {
    +			return;
    +		}
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
    +		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
    +
    +		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
    +
    +		// The path from src to trg through edge e costs src + e
    +		// If the target's distance is 0 (the target is the actual source), return 0 instead
    +		GatherFunction<Double, Double, Double> gather = new SingleSourceShortestPathGather();
    +
    +		// Return the smaller path length to minimize distance
    +		SumFunction<Double, Double, Double> sum = new SingleSourceShortestPathSum();
    +
    +		// Iterate as long as the distance is updated
    +		ApplyFunction<Double, Double, Double> apply = new SingleSourceShortestPathApply();
    +
    +		// Execute the GSA iteration
    +		GatherSumApplyIteration<Long, Double, Double, Double> iteration = graph.createGatherSumApplyIteration(
    +				gather, sum, apply, maxIterations);
    +		Graph<Long, Double, Double> result = graph.mapVertices(new InitVerticesMapper<Long>(srcVertexId))
    +				.runGatherSumApplyIteration(iteration);
    +
    +		// Extract the vertices as the result
    +		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
    +
    +		// emit result
    +		if(fileOutput) {
    +			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
    +		} else {
    +			singleSourceShortestPaths.print();
    +		}
    +
    +		env.execute("GSA Single Source Shortest Paths Example");
    +	}
    +
    +	public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
    +			implements MapFunction<Vertex<K, Double>, Double> {
    +
    +		private K srcVertexId;
    +
    +		public InitVerticesMapper(K srcId) {
    +			this.srcVertexId = srcId;
    +		}
    +
    +		public Double map(Vertex<K, Double> value) {
    +			if (value.f0.equals(srcVertexId)) {
    +				return 0.0;
    +			} else {
    +				return Double.POSITIVE_INFINITY;
    +			}
    +		}
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Single Source Shortest Path UDFs
    +	// --------------------------------------------------------------------------------------------
    +
    +	private static final class SingleSourceShortestPathGather
    +			extends GatherFunction<Double, Double, Double> {
    +		@Override
    +		public Double gather(Triplet<Double, Double> triplet) {
    +			return triplet.getSource() + triplet.getEdge();
    +		}
    +	};
    +
    +	private static final class SingleSourceShortestPathSum
    +			extends SumFunction<Double, Double, Double> {
    +		@Override
    +		public Double sum(Double arg0, Double arg1) {
    +			return Math.min(arg0, arg1);
    +		}
    +	};
    +
    +	private static final class SingleSourceShortestPathApply
    +			extends ApplyFunction<Double, Double, Double> {
    +		@Override
    +		public void apply(Double summed, Double target) {
    --- End diff --
    
    I'd do it in the other example as well, yes, to make it easier to understand what it does.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-94606828
  
    Hi @balidani!
    
    I pulled your changes and made some improvements, mainly to the examples, so that they are consistent with the rest of Gelly's examples. You can see my changes in [this branch] (https://github.com/vasia/flink/tree/gsa).
    
    I've been running some experiments on a small cluster and so far everything runs smoothly. GSA SSSP actually seems to be faster than vertex-centric SSSP for both input datasets I'm using :smile:
    
    I will run a few more experiments in the next days and if I find no problems and there are no objections, I will merge by the end of the week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r28645482
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
    @@ -1011,6 +1016,13 @@ public void reduce(Iterable<Tuple2<K, K>> values, Collector<Tuple2<K, K>> out) {
     		}
     	}
     
    +	private static final class CheckIfOneComponentMapper implements	MapFunction<Integer, Boolean> {
    --- End diff --
    
    same here.. this method isn't needed anymore


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by balidani <gi...@git.apache.org>.
Github user balidani commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-94015915
  
    Okay, I updated the PR and Travis passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/408


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/408#issuecomment-94020543
  
    Thanks @balidani! I'll try to review asap :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1514][Gelly] Add a Gather-Sum-Apply ite...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r25389945
  
    --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Triplet.java ---
    @@ -0,0 +1,45 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.gsa;
    +
    +import org.apache.flink.api.java.tuple.Tuple3;
    +
    +import java.io.Serializable;
    +
    +public class Triplet<VV extends Serializable, EV extends Serializable>
    +		extends Tuple3<VV, EV, VV> {
    +
    +	public Triplet() {}
    +
    +	public Triplet(VV src, EV edge, VV trg) {
    +		super(src, edge, trg);
    +	}
    +
    +	public VV getSource() {
    --- End diff --
    
    how about getSrcVertexValue(), getTrgVertexValue() and getEdgeValue() instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---