You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by joseprupi <gi...@git.apache.org> on 2016/05/31 04:10:49 UTC

[GitHub] flink pull request: Affinity Propagation

GitHub user joseprupi opened a pull request:

    https://github.com/apache/flink/pull/2053

    Affinity Propagation

    Hello,
    
    I have added an implementation of the Binary Affinity Propagation to Gelly. The Jira issue is:
    
    https://issues.apache.org/jira/browse/FLINK-1707
    
    This is not an implementation of the original Affinity Propagation and the detail of it can be found at:
    
    http://www.psi.toronto.edu/pubs2/2009/NC09%20-%20SimpleAP.pdf
    
    I have also added a simple example of it. To check this implementation I have compared the results with the scikit one for the original Affinity Propagation. Although the results are not exactly the same 
    they make sense to me, and I think properly parametrizing the epsilon in flink implementation and iterations in scikit we could get same result. I haven't found an implementation of this algorithm to compare the results.
    
    The example in scikit clusters stock prices. I've modified it to cluster more stocks than the original one. The results using scikit are:
    
    Cluster 1: Pepsi, Coca Cola, Kellogg
    Cluster 2: Navistar
    Cluster 3: Kimberly-Clark, Colgate-Palmolive, Procter Gamble, Kraft Foods
    Cluster 4: Wal-Mart
    Cluster 5: Comcast, Time Warner, Cablevision
    Cluster 6: ConocoPhillips, Apple, GlaxoSmithKline, Microsoft, SAP, Pfizer, Novartis, 3M, Sanofi-Aventis, IBM, Chevron, DuPont de Nemours, CVS, Total, Caterpillar, Home Depot, Valero Energy, Yahoo, Exxon, Mc Donalds, Cisco, Unilever
    Cluster 7: Ryder, Sony, Amazon, Marriott, Canon, Texas instruments, Ford, Toyota, Honda, HP, Mitsubishi, Xerox
    Cluster 8: American express, Goldman Sachs, General Electrics, Wells Fargo, Bank of America, AIG, JPMorgan Chase
    Cluster 9: Raytheon, Boeing, Walgreen, Lookheed Martin, General Dynamics, Northrop Grumman
    
    Using flink implementation:
    
    Cluster 1: CVS, Walgreen
    Cluster 2: American express, Goldman Sachs, General Electrics, Wells Fargo, Bank of America, AIG, JPMorgan Chase
    Cluster 3: Navistar
    Cluster 4: Kimberly-Clark, Colgate-Palmolive, Procter Gamble, Kellogg
    Cluster 5: Ryder, Sony, Marriott, Canon, Texas instruments, Ford, Toyota, Honda, HP, Mitsubishi, Xerox
    Cluster 6: Amazon, Yahoo
    Cluster 7: Pepsi, Coca Cola, Kraft Foods
    Cluster 8: Comcast, Time Warner, Cablevision
    Cluster 9: Raytheon, Boeing, Lookheed Martin, General Dynamics, Northrop Grumman
    Cluster 10: Wal-Mart
    Cluster 11: ConocoPhillips, Apple, GlaxoSmithKline, Microsoft, SAP, Pfizer, Novartis, 3M, Sanofi-Aventis, IBM, Chevron, DuPont de Nemours, Total, Caterpillar, Home Depot, Valero Energy, Exxon, Mc Donalds, Cisco, Unilever
    
    This results can be reproduced with  scikit branch in my flink repository and https://github.com/joseprupi/stockclusteringtest.
    
    As a future work in this implementation I guess the original algorithm could be implemented and maybe the Capacitated Affinity Propagation is mentioned in the paper.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/joseprupi/flink master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2053.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2053
    
----
commit d3b4cd98fcd07b5bba265baccbb465b6edc22e09
Author: Josep Rubio <jo...@datag.es>
Date:   2016-05-31T02:31:41Z

    Affinity Propagation

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

Posted by joseprupi <gi...@git.apache.org>.
Github user joseprupi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2053#discussion_r73792319
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java ---
    @@ -0,0 +1,535 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.LongSumAggregator;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgesFunction;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.HashMap;
    +
    +
    +/**
    + * This is an implementation of the Binary Affinity Propagation algorithm using a scatter-gather iteration.
    + * Note that is not the original Affinity Propagation.
    + *
    + * The input is an undirected graph where the vertices are the points to be clustered and the edge weights are the
    + * similarities of these points among them.
    + *
    + * The output is a Dataset of Tuple2, where f0 is the point id and f1 is the exemplar, so the clusters will be the
    + * the Tuples grouped by f1
    + *
    + * @see <a href="http://www.psi.toronto.edu/pubs2/2009/NC09%20-%20SimpleAP.pdf">
    + */
    +
    +@SuppressWarnings("serial")
    +public class AffinityPropagation implements GraphAlgorithm<Long,NullValue,Double,DataSet<Tuple2<Long, Long>>> {
    +
    +	private static Integer maxIterations;
    +	private static float damping;
    +	private static float epsilon;
    +
    +	/**
    +	 * Creates a new AffinityPropagation instance algorithm instance.
    +	 *
    +	 * @param maxIterations The maximum number of iterations to run
    +	 * @param damping Damping factor.
    +	 * @param epsilon Epsilon factor. Do not send message to a neighbor if the new message
    +	 * has not changed more than epsilon.
    +	 */
    +	public AffinityPropagation(Integer maxIterations, float damping, float epsilon) {
    +		this.maxIterations = maxIterations;
    +		this.damping = damping;
    +		this.epsilon = epsilon;
    +	}
    +
    +	@Override
    +	public DataSet<Tuple2<Long, Long>> run(Graph<Long, NullValue, Double> input) throws Exception {
    +
    +		// Create E and I AP vertices
    +		DataSet<Vertex<Long, APVertexValue>> verticesWithAllInNeighbors =
    +			input.groupReduceOnEdges(new InitAPVertex(), EdgeDirection.IN);
    +
    +		List<Vertex<Long, APVertexValue>> APvertices = verticesWithAllInNeighbors.collect();
    +
    +		// Create E and I AP edges. Could this be done with some gelly functionality?
    +		List<Edge<Long, NullValue>> APedges = new ArrayList<>();
    +
    +		for(int i = 1; i < input.numberOfVertices() + 1; i++){
    +			for(int j = 1; j < input.numberOfVertices() + 1; j++){
    +				APedges.add(new Edge<>(i * 10L, j * 10L + 1, NullValue.getInstance()));
    +			}
    +		}
    +
    +		DataSet<Edge<Long, NullValue>> APEdgesDS = input.getContext().fromCollection(APedges);
    +		DataSet<Vertex<Long, APVertexValue>> APVerticesDS = input.getContext().fromCollection(APvertices);
    +
    +		ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
    +		parameters.registerAggregator("convergedAggregator", new LongSumAggregator());
    +
    +		Graph<Long, APVertexValue, NullValue> APgraph
    +			= Graph.fromDataSet(APVerticesDS, APEdgesDS, input.getContext());
    +
    +		return APgraph.getUndirected().runScatterGatherIteration(new APVertexUpdater(input.numberOfVertices() * 2),
    +			new APMessenger(),this.maxIterations,parameters).getVertices().filter(new FilterFunction<Vertex<Long, APVertexValue>>() {
    +			@Override
    +			public boolean filter(Vertex<Long, APVertexValue> vertex) throws Exception {
    +				return vertex.getId()%2 == 0;
    +			}
    +		}).map(new MapFunction<Vertex<Long, APVertexValue>, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Vertex<Long, APVertexValue> value) throws Exception {
    +				Tuple2<Long, Long> returnValue = new Tuple2<>(value.getId()/10, value.getValue().getExemplar()/10);
    +				return returnValue;
    +			}
    +		});
    +
    +	}
    +
    +	/**
    +	* Foreach input point we have to create a pair of E,I vertices. Same structure is used for both vertex type, to
    +	* diferenciate E and I vertices is used the id. Foreach input point we will create:
    +	*
    +	* - One E vertex with the id as the original input id * 10 + 1
    +	* - One I vertex with the id as the original input id * 10
    +	*
    +	* This way even ids are from E type vertices and odd ids are from I vertices.
    +	*
    +	* It also calculates adds the weights to the I vertices. Notice that the S vertices are not created and the weights
    +	* are added to the I vertices, simulating the S vertex.
    +	*/
    +
    +	@SuppressWarnings("serial")
    +	private static final class InitAPVertex implements EdgesFunction<Long, Double, Vertex<Long, APVertexValue>> {
    +
    +		@Override
    +		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges,
    +								Collector<Vertex<Long, APVertexValue>> out) throws Exception {
    +
    +			Vertex<Long, APVertexValue> APvertexI = new Vertex<>();
    +			Vertex<Long, APVertexValue> APvertexE = new Vertex<>();
    +
    +			Iterator<Tuple2<Long, Edge<Long, Double>>> itr = edges.iterator();
    +			Tuple2<Long, Edge<Long, Double>> edge = itr.next();
    --- End diff --
    
    Nops the way is implemented could be possible to call the init function without edges. 
    I'm thinking on changing the input data to be a matrix of similarities, being each of the columns/rows the points to cluster.
    
    Thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2053#discussion_r67871432
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java ---
    @@ -0,0 +1,535 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.LongSumAggregator;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgesFunction;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.HashMap;
    +
    +
    +/**
    + * This is an implementation of the Binary Affinity Propagation algorithm using a scatter-gather iteration.
    + * Note that is not the original Affinity Propagation.
    + *
    + * The input is an undirected graph where the vertices are the points to be clustered and the edge weights are the
    + * similarities of these points among them.
    + *
    + * The output is a Dataset of Tuple2, where f0 is the point id and f1 is the exemplar, so the clusters will be the
    + * the Tuples grouped by f1
    + *
    + * @see <a href="http://www.psi.toronto.edu/pubs2/2009/NC09%20-%20SimpleAP.pdf">
    + */
    +
    +@SuppressWarnings("serial")
    +public class AffinityPropagation implements GraphAlgorithm<Long,NullValue,Double,DataSet<Tuple2<Long, Long>>> {
    +
    +	private static Integer maxIterations;
    +	private static float damping;
    +	private static float epsilon;
    +
    +	/**
    +	 * Creates a new AffinityPropagation instance algorithm instance.
    +	 *
    +	 * @param maxIterations The maximum number of iterations to run
    +	 * @param damping Damping factor.
    +	 * @param epsilon Epsilon factor. Do not send message to a neighbor if the new message
    +	 * has not changed more than epsilon.
    +	 */
    +	public AffinityPropagation(Integer maxIterations, float damping, float epsilon) {
    +		this.maxIterations = maxIterations;
    +		this.damping = damping;
    +		this.epsilon = epsilon;
    +	}
    +
    +	@Override
    +	public DataSet<Tuple2<Long, Long>> run(Graph<Long, NullValue, Double> input) throws Exception {
    +
    +		// Create E and I AP vertices
    +		DataSet<Vertex<Long, APVertexValue>> verticesWithAllInNeighbors =
    +			input.groupReduceOnEdges(new InitAPVertex(), EdgeDirection.IN);
    +
    +		List<Vertex<Long, APVertexValue>> APvertices = verticesWithAllInNeighbors.collect();
    +
    +		// Create E and I AP edges. Could this be done with some gelly functionality?
    +		List<Edge<Long, NullValue>> APedges = new ArrayList<>();
    +
    +		for(int i = 1; i < input.numberOfVertices() + 1; i++){
    +			for(int j = 1; j < input.numberOfVertices() + 1; j++){
    +				APedges.add(new Edge<>(i * 10L, j * 10L + 1, NullValue.getInstance()));
    +			}
    +		}
    +
    +		DataSet<Edge<Long, NullValue>> APEdgesDS = input.getContext().fromCollection(APedges);
    +		DataSet<Vertex<Long, APVertexValue>> APVerticesDS = input.getContext().fromCollection(APvertices);
    +
    +		ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
    +		parameters.registerAggregator("convergedAggregator", new LongSumAggregator());
    +
    +		Graph<Long, APVertexValue, NullValue> APgraph
    +			= Graph.fromDataSet(APVerticesDS, APEdgesDS, input.getContext());
    +
    +		return APgraph.getUndirected().runScatterGatherIteration(new APVertexUpdater(input.numberOfVertices() * 2),
    +			new APMessenger(),this.maxIterations,parameters).getVertices().filter(new FilterFunction<Vertex<Long, APVertexValue>>() {
    +			@Override
    +			public boolean filter(Vertex<Long, APVertexValue> vertex) throws Exception {
    +				return vertex.getId()%2 == 0;
    +			}
    +		}).map(new MapFunction<Vertex<Long, APVertexValue>, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Vertex<Long, APVertexValue> value) throws Exception {
    +				Tuple2<Long, Long> returnValue = new Tuple2<>(value.getId()/10, value.getValue().getExemplar()/10);
    +				return returnValue;
    +			}
    +		});
    +
    +	}
    +
    +	/**
    +	* Foreach input point we have to create a pair of E,I vertices. Same structure is used for both vertex type, to
    +	* diferenciate E and I vertices is used the id. Foreach input point we will create:
    +	*
    +	* - One E vertex with the id as the original input id * 10 + 1
    +	* - One I vertex with the id as the original input id * 10
    +	*
    +	* This way even ids are from E type vertices and odd ids are from I vertices.
    +	*
    +	* It also calculates adds the weights to the I vertices. Notice that the S vertices are not created and the weights
    +	* are added to the I vertices, simulating the S vertex.
    +	*/
    +
    +	@SuppressWarnings("serial")
    +	private static final class InitAPVertex implements EdgesFunction<Long, Double, Vertex<Long, APVertexValue>> {
    +
    +		@Override
    +		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges,
    +								Collector<Vertex<Long, APVertexValue>> out) throws Exception {
    +
    +			Vertex<Long, APVertexValue> APvertexI = new Vertex<>();
    +			Vertex<Long, APVertexValue> APvertexE = new Vertex<>();
    +
    +			Iterator<Tuple2<Long, Edge<Long, Double>>> itr = edges.iterator();
    +			Tuple2<Long, Edge<Long, Double>> edge = itr.next();
    --- End diff --
    
    Are you sure there's always at least one edge?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

Posted by joseprupi <gi...@git.apache.org>.
Github user joseprupi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2053#discussion_r67957847
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java ---
    @@ -0,0 +1,535 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.LongSumAggregator;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgesFunction;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.HashMap;
    +
    +
    +/**
    + * This is an implementation of the Binary Affinity Propagation algorithm using a scatter-gather iteration.
    + * Note that is not the original Affinity Propagation.
    + *
    + * The input is an undirected graph where the vertices are the points to be clustered and the edge weights are the
    + * similarities of these points among them.
    + *
    + * The output is a Dataset of Tuple2, where f0 is the point id and f1 is the exemplar, so the clusters will be the
    + * the Tuples grouped by f1
    + *
    + * @see <a href="http://www.psi.toronto.edu/pubs2/2009/NC09%20-%20SimpleAP.pdf">
    + */
    +
    +@SuppressWarnings("serial")
    +public class AffinityPropagation implements GraphAlgorithm<Long,NullValue,Double,DataSet<Tuple2<Long, Long>>> {
    +
    +	private static Integer maxIterations;
    +	private static float damping;
    +	private static float epsilon;
    +
    +	/**
    +	 * Creates a new AffinityPropagation instance algorithm instance.
    +	 *
    +	 * @param maxIterations The maximum number of iterations to run
    +	 * @param damping Damping factor.
    +	 * @param epsilon Epsilon factor. Do not send message to a neighbor if the new message
    +	 * has not changed more than epsilon.
    +	 */
    +	public AffinityPropagation(Integer maxIterations, float damping, float epsilon) {
    +		this.maxIterations = maxIterations;
    +		this.damping = damping;
    +		this.epsilon = epsilon;
    +	}
    +
    +	@Override
    +	public DataSet<Tuple2<Long, Long>> run(Graph<Long, NullValue, Double> input) throws Exception {
    +
    +		// Create E and I AP vertices
    +		DataSet<Vertex<Long, APVertexValue>> verticesWithAllInNeighbors =
    +			input.groupReduceOnEdges(new InitAPVertex(), EdgeDirection.IN);
    +
    +		List<Vertex<Long, APVertexValue>> APvertices = verticesWithAllInNeighbors.collect();
    --- End diff --
    
    I'll rethink this too with having a different input


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

Posted by joseprupi <gi...@git.apache.org>.
Github user joseprupi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2053#discussion_r67952276
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java ---
    @@ -0,0 +1,535 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.LongSumAggregator;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgesFunction;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.HashMap;
    +
    +
    +/**
    + * This is an implementation of the Binary Affinity Propagation algorithm using a scatter-gather iteration.
    + * Note that is not the original Affinity Propagation.
    + *
    + * The input is an undirected graph where the vertices are the points to be clustered and the edge weights are the
    + * similarities of these points among them.
    + *
    + * The output is a Dataset of Tuple2, where f0 is the point id and f1 is the exemplar, so the clusters will be the
    + * the Tuples grouped by f1
    + *
    + * @see <a href="http://www.psi.toronto.edu/pubs2/2009/NC09%20-%20SimpleAP.pdf">
    + */
    +
    +@SuppressWarnings("serial")
    +public class AffinityPropagation implements GraphAlgorithm<Long,NullValue,Double,DataSet<Tuple2<Long, Long>>> {
    --- End diff --
    
    For each input point we need to create two vertex types and id is used to differentiate them. 
    id*10 is a type e vertex and id*10+1 is a I type vertex, that's the reason of having a numeric id. 
    
    The reason of this format is that when debugging is easy to see which point we are processing and easy to differentiate between E and I vertex (odd and even).
    
    Having a matrix instead of a graph as an input will avoid to have any ID. I'll try to explain it in an example, but again maybe the input format needs to be reviewed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2053: [FLINK-1707] Affinity Propagation

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the issue:

    https://github.com/apache/flink/pull/2053
  
    Hi @joseprupi,
    thanks a lot for this PR.
    
    I have looked through the code and I'm quite worried about its performance and scalability. Have you tried it out with any big inputs? We will definitely have to get rid of the calls to 'collect()' and try to re-think whether we can do without so much state in the auxiliary vertices.
    
    Even though I am quite familiar with the original AP algorithm, I'm not sure I understand all the details of the binary AP. Could you maybe create and explain a small example, for a small input graph and say 2-3 iterations, and show the messages and computations that are required. You can add this to the design document. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2053#discussion_r67873432
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java ---
    @@ -0,0 +1,535 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.LongSumAggregator;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgesFunction;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.HashMap;
    +
    +
    +/**
    + * This is an implementation of the Binary Affinity Propagation algorithm using a scatter-gather iteration.
    + * Note that is not the original Affinity Propagation.
    + *
    + * The input is an undirected graph where the vertices are the points to be clustered and the edge weights are the
    + * similarities of these points among them.
    + *
    + * The output is a Dataset of Tuple2, where f0 is the point id and f1 is the exemplar, so the clusters will be the
    + * the Tuples grouped by f1
    + *
    + * @see <a href="http://www.psi.toronto.edu/pubs2/2009/NC09%20-%20SimpleAP.pdf">
    + */
    +
    +@SuppressWarnings("serial")
    +public class AffinityPropagation implements GraphAlgorithm<Long,NullValue,Double,DataSet<Tuple2<Long, Long>>> {
    +
    +	private static Integer maxIterations;
    +	private static float damping;
    +	private static float epsilon;
    +
    +	/**
    +	 * Creates a new AffinityPropagation instance algorithm instance.
    +	 *
    +	 * @param maxIterations The maximum number of iterations to run
    +	 * @param damping Damping factor.
    +	 * @param epsilon Epsilon factor. Do not send message to a neighbor if the new message
    +	 * has not changed more than epsilon.
    +	 */
    +	public AffinityPropagation(Integer maxIterations, float damping, float epsilon) {
    +		this.maxIterations = maxIterations;
    +		this.damping = damping;
    +		this.epsilon = epsilon;
    +	}
    +
    +	@Override
    +	public DataSet<Tuple2<Long, Long>> run(Graph<Long, NullValue, Double> input) throws Exception {
    +
    +		// Create E and I AP vertices
    +		DataSet<Vertex<Long, APVertexValue>> verticesWithAllInNeighbors =
    +			input.groupReduceOnEdges(new InitAPVertex(), EdgeDirection.IN);
    +
    +		List<Vertex<Long, APVertexValue>> APvertices = verticesWithAllInNeighbors.collect();
    --- End diff --
    
    I would strongly suggest _against_ using `collect()` here. This will send all data back to the client and that's a potentially big dataset you're collecting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

Posted by joseprupi <gi...@git.apache.org>.
Github user joseprupi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2053#discussion_r67951222
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java ---
    @@ -0,0 +1,535 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.LongSumAggregator;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgesFunction;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.HashMap;
    +
    +
    +/**
    + * This is an implementation of the Binary Affinity Propagation algorithm using a scatter-gather iteration.
    + * Note that is not the original Affinity Propagation.
    + *
    + * The input is an undirected graph where the vertices are the points to be clustered and the edge weights are the
    --- End diff --
    
    It does not ignores edge directions and all the edges need to be defined with both directions and same similarity. I thought in gelly all edges have direction and to me it's like defining a similarity symmetric matrix. I think I've seen some proposal to have undirected edges but I'm not following it lastly.
    
    I think input format was one of the questions I did in the document as it is one of the doubts I had from the beginning. I wanted to follow a "gelly" format and have a graph as an input, but maybe makes more sense having an array with values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2053: [FLINK-1707] Affinity Propagation

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the issue:

    https://github.com/apache/flink/pull/2053
  
    I'll shepherd this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2053#discussion_r67868911
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java ---
    @@ -0,0 +1,535 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.LongSumAggregator;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgesFunction;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.HashMap;
    +
    +
    +/**
    + * This is an implementation of the Binary Affinity Propagation algorithm using a scatter-gather iteration.
    + * Note that is not the original Affinity Propagation.
    + *
    + * The input is an undirected graph where the vertices are the points to be clustered and the edge weights are the
    --- End diff --
    
    Do we expect the graph to be undirected or does the implementation ignore edge directions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2053#discussion_r67868917
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java ---
    @@ -0,0 +1,535 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.LongSumAggregator;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgesFunction;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.HashMap;
    +
    +
    +/**
    + * This is an implementation of the Binary Affinity Propagation algorithm using a scatter-gather iteration.
    + * Note that is not the original Affinity Propagation.
    + *
    + * The input is an undirected graph where the vertices are the points to be clustered and the edge weights are the
    + * similarities of these points among them.
    + *
    + * The output is a Dataset of Tuple2, where f0 is the point id and f1 is the exemplar, so the clusters will be the
    --- End diff --
    
    What are the expected types of the points and the similarities?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

Posted by joseprupi <gi...@git.apache.org>.
Github user joseprupi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2053#discussion_r67957755
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java ---
    @@ -0,0 +1,535 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.LongSumAggregator;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgesFunction;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.HashMap;
    +
    +
    +/**
    + * This is an implementation of the Binary Affinity Propagation algorithm using a scatter-gather iteration.
    + * Note that is not the original Affinity Propagation.
    + *
    + * The input is an undirected graph where the vertices are the points to be clustered and the edge weights are the
    + * similarities of these points among them.
    + *
    + * The output is a Dataset of Tuple2, where f0 is the point id and f1 is the exemplar, so the clusters will be the
    + * the Tuples grouped by f1
    + *
    + * @see <a href="http://www.psi.toronto.edu/pubs2/2009/NC09%20-%20SimpleAP.pdf">
    + */
    +
    +@SuppressWarnings("serial")
    +public class AffinityPropagation implements GraphAlgorithm<Long,NullValue,Double,DataSet<Tuple2<Long, Long>>> {
    +
    +	private static Integer maxIterations;
    +	private static float damping;
    +	private static float epsilon;
    +
    +	/**
    +	 * Creates a new AffinityPropagation instance algorithm instance.
    +	 *
    +	 * @param maxIterations The maximum number of iterations to run
    +	 * @param damping Damping factor.
    +	 * @param epsilon Epsilon factor. Do not send message to a neighbor if the new message
    +	 * has not changed more than epsilon.
    +	 */
    +	public AffinityPropagation(Integer maxIterations, float damping, float epsilon) {
    +		this.maxIterations = maxIterations;
    +		this.damping = damping;
    +		this.epsilon = epsilon;
    +	}
    +
    +	@Override
    +	public DataSet<Tuple2<Long, Long>> run(Graph<Long, NullValue, Double> input) throws Exception {
    +
    +		// Create E and I AP vertices
    +		DataSet<Vertex<Long, APVertexValue>> verticesWithAllInNeighbors =
    +			input.groupReduceOnEdges(new InitAPVertex(), EdgeDirection.IN);
    +
    +		List<Vertex<Long, APVertexValue>> APvertices = verticesWithAllInNeighbors.collect();
    +
    +		// Create E and I AP edges. Could this be done with some gelly functionality?
    +		List<Edge<Long, NullValue>> APedges = new ArrayList<>();
    +
    +		for(int i = 1; i < input.numberOfVertices() + 1; i++){
    +			for(int j = 1; j < input.numberOfVertices() + 1; j++){
    +				APedges.add(new Edge<>(i * 10L, j * 10L + 1, NullValue.getInstance()));
    +			}
    +		}
    +
    +		DataSet<Edge<Long, NullValue>> APEdgesDS = input.getContext().fromCollection(APedges);
    +		DataSet<Vertex<Long, APVertexValue>> APVerticesDS = input.getContext().fromCollection(APvertices);
    +
    +		ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
    +		parameters.registerAggregator("convergedAggregator", new LongSumAggregator());
    +
    +		Graph<Long, APVertexValue, NullValue> APgraph
    +			= Graph.fromDataSet(APVerticesDS, APEdgesDS, input.getContext());
    +
    +		return APgraph.getUndirected().runScatterGatherIteration(new APVertexUpdater(input.numberOfVertices() * 2),
    +			new APMessenger(),this.maxIterations,parameters).getVertices().filter(new FilterFunction<Vertex<Long, APVertexValue>>() {
    +			@Override
    +			public boolean filter(Vertex<Long, APVertexValue> vertex) throws Exception {
    +				return vertex.getId()%2 == 0;
    +			}
    +		}).map(new MapFunction<Vertex<Long, APVertexValue>, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Vertex<Long, APVertexValue> value) throws Exception {
    +				Tuple2<Long, Long> returnValue = new Tuple2<>(value.getId()/10, value.getValue().getExemplar()/10);
    +				return returnValue;
    +			}
    +		});
    +
    +	}
    +
    +	/**
    +	* Foreach input point we have to create a pair of E,I vertices. Same structure is used for both vertex type, to
    +	* diferenciate E and I vertices is used the id. Foreach input point we will create:
    +	*
    +	* - One E vertex with the id as the original input id * 10 + 1
    +	* - One I vertex with the id as the original input id * 10
    +	*
    +	* This way even ids are from E type vertices and odd ids are from I vertices.
    +	*
    +	* It also calculates adds the weights to the I vertices. Notice that the S vertices are not created and the weights
    +	* are added to the I vertices, simulating the S vertex.
    +	*/
    +
    +	@SuppressWarnings("serial")
    +	private static final class InitAPVertex implements EdgesFunction<Long, Double, Vertex<Long, APVertexValue>> {
    +
    +		@Override
    +		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges,
    +								Collector<Vertex<Long, APVertexValue>> out) throws Exception {
    +
    +			Vertex<Long, APVertexValue> APvertexI = new Vertex<>();
    +			Vertex<Long, APVertexValue> APvertexE = new Vertex<>();
    +
    +			Iterator<Tuple2<Long, Edge<Long, Double>>> itr = edges.iterator();
    +			Tuple2<Long, Edge<Long, Double>> edge = itr.next();
    +
    +			APvertexE.setId(edge.f0 * 10 + 1);
    +			APvertexE.setValue(new APVertexValue());
    +
    +			APvertexI.setId(edge.f0 * 10);
    +			APvertexI.setValue(new APVertexValue());
    +			APvertexI.getValue().getWeights().put(edge.f1.getSource() * 10 + 1, edge.f1.getValue());
    +
    +			APvertexE.getValue().getOldValues().put(edge.f1.getSource() * 10, 0.0);
    +			APvertexI.getValue().getOldValues().put(edge.f1.getSource() * 10 + 1, 0.0);
    +
    +
    +			while(itr.hasNext()){
    +				edge = itr.next();
    +				APvertexI.getValue().getWeights().put(edge.f1.getSource() * 10 + 1, edge.f1.getValue());
    +
    +				APvertexE.getValue().getOldValues().put(edge.f1.getSource() * 10, 0.0);
    +				APvertexI.getValue().getOldValues().put(edge.f1.getSource() * 10 + 1, 0.0);
    --- End diff --
    
    It creates two hash maps:
    
    - One used for the messages to be sent during the messaging phase. I think this could be solved using a different model like vertex centric
    - One used for damping. I don't see now another way to solve this as old values associated to each destination vertex are needed to calculate damped values.
    
    I'll try to rethink this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Affinity Propagation

Posted by joseprupi <gi...@git.apache.org>.
Github user joseprupi commented on the pull request:

    https://github.com/apache/flink/pull/2053#issuecomment-222650638
  
    Ok, thanks @vasia !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2053#discussion_r67873061
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java ---
    @@ -0,0 +1,535 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.LongSumAggregator;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgesFunction;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.HashMap;
    +
    +
    +/**
    + * This is an implementation of the Binary Affinity Propagation algorithm using a scatter-gather iteration.
    + * Note that is not the original Affinity Propagation.
    + *
    + * The input is an undirected graph where the vertices are the points to be clustered and the edge weights are the
    + * similarities of these points among them.
    + *
    + * The output is a Dataset of Tuple2, where f0 is the point id and f1 is the exemplar, so the clusters will be the
    + * the Tuples grouped by f1
    + *
    + * @see <a href="http://www.psi.toronto.edu/pubs2/2009/NC09%20-%20SimpleAP.pdf">
    + */
    +
    +@SuppressWarnings("serial")
    +public class AffinityPropagation implements GraphAlgorithm<Long,NullValue,Double,DataSet<Tuple2<Long, Long>>> {
    +
    +	private static Integer maxIterations;
    +	private static float damping;
    +	private static float epsilon;
    +
    +	/**
    +	 * Creates a new AffinityPropagation instance algorithm instance.
    +	 *
    +	 * @param maxIterations The maximum number of iterations to run
    +	 * @param damping Damping factor.
    +	 * @param epsilon Epsilon factor. Do not send message to a neighbor if the new message
    +	 * has not changed more than epsilon.
    +	 */
    +	public AffinityPropagation(Integer maxIterations, float damping, float epsilon) {
    +		this.maxIterations = maxIterations;
    +		this.damping = damping;
    +		this.epsilon = epsilon;
    +	}
    +
    +	@Override
    +	public DataSet<Tuple2<Long, Long>> run(Graph<Long, NullValue, Double> input) throws Exception {
    +
    +		// Create E and I AP vertices
    +		DataSet<Vertex<Long, APVertexValue>> verticesWithAllInNeighbors =
    +			input.groupReduceOnEdges(new InitAPVertex(), EdgeDirection.IN);
    +
    +		List<Vertex<Long, APVertexValue>> APvertices = verticesWithAllInNeighbors.collect();
    +
    +		// Create E and I AP edges. Could this be done with some gelly functionality?
    +		List<Edge<Long, NullValue>> APedges = new ArrayList<>();
    +
    +		for(int i = 1; i < input.numberOfVertices() + 1; i++){
    +			for(int j = 1; j < input.numberOfVertices() + 1; j++){
    +				APedges.add(new Edge<>(i * 10L, j * 10L + 1, NullValue.getInstance()));
    +			}
    +		}
    +
    +		DataSet<Edge<Long, NullValue>> APEdgesDS = input.getContext().fromCollection(APedges);
    +		DataSet<Vertex<Long, APVertexValue>> APVerticesDS = input.getContext().fromCollection(APvertices);
    +
    +		ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
    +		parameters.registerAggregator("convergedAggregator", new LongSumAggregator());
    +
    +		Graph<Long, APVertexValue, NullValue> APgraph
    +			= Graph.fromDataSet(APVerticesDS, APEdgesDS, input.getContext());
    +
    +		return APgraph.getUndirected().runScatterGatherIteration(new APVertexUpdater(input.numberOfVertices() * 2),
    +			new APMessenger(),this.maxIterations,parameters).getVertices().filter(new FilterFunction<Vertex<Long, APVertexValue>>() {
    +			@Override
    +			public boolean filter(Vertex<Long, APVertexValue> vertex) throws Exception {
    +				return vertex.getId()%2 == 0;
    +			}
    +		}).map(new MapFunction<Vertex<Long, APVertexValue>, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Vertex<Long, APVertexValue> value) throws Exception {
    +				Tuple2<Long, Long> returnValue = new Tuple2<>(value.getId()/10, value.getValue().getExemplar()/10);
    +				return returnValue;
    +			}
    +		});
    +
    +	}
    +
    +	/**
    +	* Foreach input point we have to create a pair of E,I vertices. Same structure is used for both vertex type, to
    +	* diferenciate E and I vertices is used the id. Foreach input point we will create:
    +	*
    +	* - One E vertex with the id as the original input id * 10 + 1
    +	* - One I vertex with the id as the original input id * 10
    +	*
    +	* This way even ids are from E type vertices and odd ids are from I vertices.
    +	*
    +	* It also calculates adds the weights to the I vertices. Notice that the S vertices are not created and the weights
    +	* are added to the I vertices, simulating the S vertex.
    +	*/
    +
    +	@SuppressWarnings("serial")
    +	private static final class InitAPVertex implements EdgesFunction<Long, Double, Vertex<Long, APVertexValue>> {
    +
    +		@Override
    +		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges,
    +								Collector<Vertex<Long, APVertexValue>> out) throws Exception {
    +
    +			Vertex<Long, APVertexValue> APvertexI = new Vertex<>();
    +			Vertex<Long, APVertexValue> APvertexE = new Vertex<>();
    +
    +			Iterator<Tuple2<Long, Edge<Long, Double>>> itr = edges.iterator();
    +			Tuple2<Long, Edge<Long, Double>> edge = itr.next();
    +
    +			APvertexE.setId(edge.f0 * 10 + 1);
    +			APvertexE.setValue(new APVertexValue());
    +
    +			APvertexI.setId(edge.f0 * 10);
    +			APvertexI.setValue(new APVertexValue());
    +			APvertexI.getValue().getWeights().put(edge.f1.getSource() * 10 + 1, edge.f1.getValue());
    +
    +			APvertexE.getValue().getOldValues().put(edge.f1.getSource() * 10, 0.0);
    +			APvertexI.getValue().getOldValues().put(edge.f1.getSource() * 10 + 1, 0.0);
    +
    +
    +			while(itr.hasNext()){
    +				edge = itr.next();
    +				APvertexI.getValue().getWeights().put(edge.f1.getSource() * 10 + 1, edge.f1.getValue());
    +
    +				APvertexE.getValue().getOldValues().put(edge.f1.getSource() * 10, 0.0);
    +				APvertexI.getValue().getOldValues().put(edge.f1.getSource() * 10 + 1, 0.0);
    --- End diff --
    
    So, for each vertex, you are creating two extra vertices, each of which basically carries two hashmaps in the order of neighbors, right? This is really of prohibitive cost for any graph with mid- or high-degree vertices. Is there any other way to implement this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2053#discussion_r69015248
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java ---
    @@ -0,0 +1,535 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.LongSumAggregator;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgesFunction;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.HashMap;
    +
    +
    +/**
    + * This is an implementation of the Binary Affinity Propagation algorithm using a scatter-gather iteration.
    + * Note that is not the original Affinity Propagation.
    + *
    + * The input is an undirected graph where the vertices are the points to be clustered and the edge weights are the
    --- End diff --
    
    The matrix can be asymmetric as in Frey and Dueck's example of city flight times. Their paper also discusses sparse graphs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Affinity Propagation

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/2053#issuecomment-222640838
  
    Hi @joseprupi,
    thanks for the PR! Just a heads-up. There is currently a very high number of unreviewed PRs and it might take a while until someone finds the time to take a look. Thanks for your understanding :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

Posted by joseprupi <gi...@git.apache.org>.
Github user joseprupi closed the pull request at:

    https://github.com/apache/flink/pull/2053


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2053#discussion_r67873916
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java ---
    @@ -0,0 +1,535 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.LongSumAggregator;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgesFunction;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.HashMap;
    +
    +
    +/**
    + * This is an implementation of the Binary Affinity Propagation algorithm using a scatter-gather iteration.
    + * Note that is not the original Affinity Propagation.
    + *
    + * The input is an undirected graph where the vertices are the points to be clustered and the edge weights are the
    + * similarities of these points among them.
    + *
    + * The output is a Dataset of Tuple2, where f0 is the point id and f1 is the exemplar, so the clusters will be the
    + * the Tuples grouped by f1
    + *
    + * @see <a href="http://www.psi.toronto.edu/pubs2/2009/NC09%20-%20SimpleAP.pdf">
    + */
    +
    +@SuppressWarnings("serial")
    +public class AffinityPropagation implements GraphAlgorithm<Long,NullValue,Double,DataSet<Tuple2<Long, Long>>> {
    +
    +	private static Integer maxIterations;
    +	private static float damping;
    +	private static float epsilon;
    +
    +	/**
    +	 * Creates a new AffinityPropagation instance algorithm instance.
    +	 *
    +	 * @param maxIterations The maximum number of iterations to run
    +	 * @param damping Damping factor.
    +	 * @param epsilon Epsilon factor. Do not send message to a neighbor if the new message
    +	 * has not changed more than epsilon.
    +	 */
    +	public AffinityPropagation(Integer maxIterations, float damping, float epsilon) {
    +		this.maxIterations = maxIterations;
    +		this.damping = damping;
    +		this.epsilon = epsilon;
    +	}
    +
    +	@Override
    +	public DataSet<Tuple2<Long, Long>> run(Graph<Long, NullValue, Double> input) throws Exception {
    +
    +		// Create E and I AP vertices
    +		DataSet<Vertex<Long, APVertexValue>> verticesWithAllInNeighbors =
    +			input.groupReduceOnEdges(new InitAPVertex(), EdgeDirection.IN);
    +
    +		List<Vertex<Long, APVertexValue>> APvertices = verticesWithAllInNeighbors.collect();
    +
    +		// Create E and I AP edges. Could this be done with some gelly functionality?
    +		List<Edge<Long, NullValue>> APedges = new ArrayList<>();
    +
    +		for(int i = 1; i < input.numberOfVertices() + 1; i++){
    +			for(int j = 1; j < input.numberOfVertices() + 1; j++){
    +				APedges.add(new Edge<>(i * 10L, j * 10L + 1, NullValue.getInstance()));
    --- End diff --
    
    Similarly, this is a potentially very large list, which might not fit in memory...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2053#discussion_r67868962
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java ---
    @@ -0,0 +1,535 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.LongSumAggregator;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgesFunction;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.HashMap;
    +
    +
    +/**
    + * This is an implementation of the Binary Affinity Propagation algorithm using a scatter-gather iteration.
    + * Note that is not the original Affinity Propagation.
    + *
    + * The input is an undirected graph where the vertices are the points to be clustered and the edge weights are the
    + * similarities of these points among them.
    + *
    + * The output is a Dataset of Tuple2, where f0 is the point id and f1 is the exemplar, so the clusters will be the
    + * the Tuples grouped by f1
    + *
    + * @see <a href="http://www.psi.toronto.edu/pubs2/2009/NC09%20-%20SimpleAP.pdf">
    + */
    +
    +@SuppressWarnings("serial")
    +public class AffinityPropagation implements GraphAlgorithm<Long,NullValue,Double,DataSet<Tuple2<Long, Long>>> {
    --- End diff --
    
    Do we need long ids or could we use a generic type? I suppose we do need similarities to be of double type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2053#discussion_r67869049
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java ---
    @@ -0,0 +1,535 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.LongSumAggregator;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgesFunction;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.HashMap;
    +
    +
    +/**
    + * This is an implementation of the Binary Affinity Propagation algorithm using a scatter-gather iteration.
    + * Note that is not the original Affinity Propagation.
    + *
    + * The input is an undirected graph where the vertices are the points to be clustered and the edge weights are the
    + * similarities of these points among them.
    + *
    + * The output is a Dataset of Tuple2, where f0 is the point id and f1 is the exemplar, so the clusters will be the
    + * the Tuples grouped by f1
    + *
    + * @see <a href="http://www.psi.toronto.edu/pubs2/2009/NC09%20-%20SimpleAP.pdf">
    + */
    +
    +@SuppressWarnings("serial")
    +public class AffinityPropagation implements GraphAlgorithm<Long,NullValue,Double,DataSet<Tuple2<Long, Long>>> {
    +
    +	private static Integer maxIterations;
    +	private static float damping;
    +	private static float epsilon;
    +
    +	/**
    +	 * Creates a new AffinityPropagation instance algorithm instance.
    --- End diff --
    
    remove second "instance"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2053#discussion_r69121871
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java ---
    @@ -0,0 +1,535 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.LongSumAggregator;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgesFunction;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.HashMap;
    +
    +
    +/**
    + * This is an implementation of the Binary Affinity Propagation algorithm using a scatter-gather iteration.
    + * Note that is not the original Affinity Propagation.
    + *
    + * The input is an undirected graph where the vertices are the points to be clustered and the edge weights are the
    --- End diff --
    
    The matrix can be asymmetric as in Frey and Dueck's example of city flight times. Their paper also discusses sparse graphs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2053#discussion_r69121905
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/AffinityPropagation.java ---
    @@ -0,0 +1,535 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.LongSumAggregator;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgesFunction;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.HashMap;
    +
    +
    +/**
    + * This is an implementation of the Binary Affinity Propagation algorithm using a scatter-gather iteration.
    + * Note that is not the original Affinity Propagation.
    + *
    + * The input is an undirected graph where the vertices are the points to be clustered and the edge weights are the
    + * similarities of these points among them.
    + *
    + * The output is a Dataset of Tuple2, where f0 is the point id and f1 is the exemplar, so the clusters will be the
    + * the Tuples grouped by f1
    + *
    + * @see <a href="http://www.psi.toronto.edu/pubs2/2009/NC09%20-%20SimpleAP.pdf">
    + */
    +
    +@SuppressWarnings("serial")
    +public class AffinityPropagation implements GraphAlgorithm<Long,NullValue,Double,DataSet<Tuple2<Long, Long>>> {
    +
    +	private static Integer maxIterations;
    +	private static float damping;
    +	private static float epsilon;
    +
    +	/**
    +	 * Creates a new AffinityPropagation instance algorithm instance.
    +	 *
    +	 * @param maxIterations The maximum number of iterations to run
    +	 * @param damping Damping factor.
    +	 * @param epsilon Epsilon factor. Do not send message to a neighbor if the new message
    +	 * has not changed more than epsilon.
    +	 */
    +	public AffinityPropagation(Integer maxIterations, float damping, float epsilon) {
    +		this.maxIterations = maxIterations;
    +		this.damping = damping;
    +		this.epsilon = epsilon;
    +	}
    +
    +	@Override
    +	public DataSet<Tuple2<Long, Long>> run(Graph<Long, NullValue, Double> input) throws Exception {
    +
    +		// Create E and I AP vertices
    +		DataSet<Vertex<Long, APVertexValue>> verticesWithAllInNeighbors =
    +			input.groupReduceOnEdges(new InitAPVertex(), EdgeDirection.IN);
    +
    +		List<Vertex<Long, APVertexValue>> APvertices = verticesWithAllInNeighbors.collect();
    +
    +		// Create E and I AP edges. Could this be done with some gelly functionality?
    +		List<Edge<Long, NullValue>> APedges = new ArrayList<>();
    +
    +		for(int i = 1; i < input.numberOfVertices() + 1; i++){
    +			for(int j = 1; j < input.numberOfVertices() + 1; j++){
    +				APedges.add(new Edge<>(i * 10L, j * 10L + 1, NullValue.getInstance()));
    +			}
    +		}
    +
    +		DataSet<Edge<Long, NullValue>> APEdgesDS = input.getContext().fromCollection(APedges);
    +		DataSet<Vertex<Long, APVertexValue>> APVerticesDS = input.getContext().fromCollection(APvertices);
    +
    +		ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
    +		parameters.registerAggregator("convergedAggregator", new LongSumAggregator());
    +
    +		Graph<Long, APVertexValue, NullValue> APgraph
    +			= Graph.fromDataSet(APVerticesDS, APEdgesDS, input.getContext());
    +
    +		return APgraph.getUndirected().runScatterGatherIteration(new APVertexUpdater(input.numberOfVertices() * 2),
    +			new APMessenger(),this.maxIterations,parameters).getVertices().filter(new FilterFunction<Vertex<Long, APVertexValue>>() {
    +			@Override
    +			public boolean filter(Vertex<Long, APVertexValue> vertex) throws Exception {
    +				return vertex.getId()%2 == 0;
    +			}
    +		}).map(new MapFunction<Vertex<Long, APVertexValue>, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Vertex<Long, APVertexValue> value) throws Exception {
    +				Tuple2<Long, Long> returnValue = new Tuple2<>(value.getId()/10, value.getValue().getExemplar()/10);
    +				return returnValue;
    +			}
    +		});
    +
    +	}
    +
    +	/**
    +	* Foreach input point we have to create a pair of E,I vertices. Same structure is used for both vertex type, to
    +	* diferenciate E and I vertices is used the id. Foreach input point we will create:
    +	*
    +	* - One E vertex with the id as the original input id * 10 + 1
    +	* - One I vertex with the id as the original input id * 10
    +	*
    +	* This way even ids are from E type vertices and odd ids are from I vertices.
    +	*
    +	* It also calculates adds the weights to the I vertices. Notice that the S vertices are not created and the weights
    +	* are added to the I vertices, simulating the S vertex.
    +	*/
    +
    +	@SuppressWarnings("serial")
    +	private static final class InitAPVertex implements EdgesFunction<Long, Double, Vertex<Long, APVertexValue>> {
    +
    +		@Override
    +		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges,
    +								Collector<Vertex<Long, APVertexValue>> out) throws Exception {
    +
    +			Vertex<Long, APVertexValue> APvertexI = new Vertex<>();
    +			Vertex<Long, APVertexValue> APvertexE = new Vertex<>();
    +
    +			Iterator<Tuple2<Long, Edge<Long, Double>>> itr = edges.iterator();
    +			Tuple2<Long, Edge<Long, Double>> edge = itr.next();
    +
    +			APvertexE.setId(edge.f0 * 10 + 1);
    +			APvertexE.setValue(new APVertexValue());
    +
    +			APvertexI.setId(edge.f0 * 10);
    +			APvertexI.setValue(new APVertexValue());
    +			APvertexI.getValue().getWeights().put(edge.f1.getSource() * 10 + 1, edge.f1.getValue());
    +
    +			APvertexE.getValue().getOldValues().put(edge.f1.getSource() * 10, 0.0);
    +			APvertexI.getValue().getOldValues().put(edge.f1.getSource() * 10 + 1, 0.0);
    +
    +
    +			while(itr.hasNext()){
    +				edge = itr.next();
    +				APvertexI.getValue().getWeights().put(edge.f1.getSource() * 10 + 1, edge.f1.getValue());
    +
    +				APvertexE.getValue().getOldValues().put(edge.f1.getSource() * 10, 0.0);
    +				APvertexI.getValue().getOldValues().put(edge.f1.getSource() * 10 + 1, 0.0);
    +
    +			}
    +
    +			out.collect(APvertexE);
    +			out.collect(APvertexI);
    +		}
    +	}
    +
    +	/**
    +	 * Vertex updater
    +	 */
    +
    +	@SuppressWarnings("serial")
    +	public static final class APVertexUpdater extends VertexUpdateFunction<Long, APVertexValue, APMessage> {
    +
    +		private Long numOfVertex;
    +		LongSumAggregator aggregator = new LongSumAggregator();
    +
    +		public APVertexUpdater(Long numOfVertex){
    +			this.numOfVertex = numOfVertex;
    +		}
    +
    +		@Override
    +		public void preSuperstep() throws Exception {
    +
    +			aggregator = getIterationAggregator("convergedAggregator");
    +
    +		}
    +
    +		/**
    +		 * Main vertex update function. It calls updateIVertex, updateEVertex, computeExemplars or computeClusters
    +		 * depending on the phase of the algorithm execution
    +		 */
    +
    +		@Override
    +		public void updateVertex(Vertex<Long, APVertexValue> vertex,
    +								MessageIterator<APMessage> inMessages) {
    +
    +			//If all vertices converged compute the Exemplars
    +
    +			if(getSuperstepNumber() > 1
    +				&& (((LongValue)getPreviousIterationAggregate("convergedAggregator")).getValue()
    +				== numOfVertex|| getSuperstepNumber() == maxIterations-2)) {
    +				computeExemplars(vertex, inMessages);
    +				return;
    +			}
    +
    +			//Once the exemplars have been calculated calculate the clusters. The aggregator has a negative value assigned
    +			//when exemplars are calculated
    +			if(getSuperstepNumber() > 1
    +				&& ((LongValue)getPreviousIterationAggregate("convergedAggregator")).getValue()
    +				< 0) {
    +				if(vertex.getValue().getExemplar() < 0){
    +					computeClusters(vertex, inMessages);
    +				}
    +				return;
    +			}
    +
    +			//Call updateIvertex or updateEvertex depending on the id
    +			if(vertex.getId()%2 == 0){
    --- End diff --
    
    Are the I and E vertex updates not talking past each other? We had this consideration with HITS where the computation is running twice in parallel and the updates are crisscrossing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---