You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by gallenvara <gi...@git.apache.org> on 2016/05/02 12:21:49 UTC

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

GitHub user gallenvara opened a pull request:

    https://github.com/apache/flink/pull/1956

    [FLINK-2044] [gelly] Implementation of Gelly HITS Algorithm

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [X] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [X] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [X] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed
    
    �Implement HITS Algorithm by dividing the update of `hubs` and `authorities` into two processes. 
    �If users want to find final nice hub pages, they can set the HITSParameter to `HUB`, the same as `AUTHORITY`.
    �Use sum normalization to normalize the vertex value.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gallenvara/flink HITS_Algorithm

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1956.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1956
    
----
commit 96b4ed1e91d08d992f55b849b451eb496d49b5dd
Author: gallenvara <ga...@126.com>
Date:   2016-05-02T09:58:17Z

    Implementation of HITS algorithm.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r62493996
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.graph.utils.NullValueEdgeMapper;
    +import org.apache.flink.types.DoubleValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + *
    + * @see <a href="https://en.wikipedia.org/wiki/HITS_algorithm">HITS Algorithm</a>
    + */
    +public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<Double, Double>>>> {
    --- End diff --
    
    As a mutable type `DoubleValue` results in many fewer object instantiations compared with Java's immutable `Double`. Normally garbage collection isn't a concern but for simple algorithms this can have a noticeable impact.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-219023387
  
    Thank you for the update @gallenvara. I'll take a look soon!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61780511
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    +
    +	public static enum HITSParameter {
    +		HUB,
    +		AUTHORITY
    +	}
    +
    +	private int maxIterations;
    +	private long numberOfVertices;
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is known,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, long, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +	}
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is unknown,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, long numberOfVertices, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +		this.numberOfVertices = numberOfVertices;
    +	}
    +
    +	@Override
    +	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> netGraph) throws Exception {
    +		if (this.numberOfVertices == 0) {
    +			this.numberOfVertices = netGraph.numberOfVertices();
    +		}
    +
    +		ScatterGatherConfiguration parameter = new ScatterGatherConfiguration();
    +		parameter.setDirection(EdgeDirection.ALL);
    +		parameter.registerAggregator("sumAllValue", new DoubleSumAggregator());
    +
    +		return netGraph.runScatterGatherIteration(new VertexUpdate<K>(maxIterations),
    +				new MessageUpdate<K>(maxIterations), maxIterations, parameter).getVertices();
    +	}
    +
    +	/**
    +	 * Function that updates the value of a vertex by summing up the partial
    +	 * values from all messages and normalize the value.
    +	 */
    +	@SuppressWarnings("serial")
    +	public static final class VertexUpdate<K> extends VertexUpdateFunction<K, Double, Double> {
    +		private int maxIteration;
    +		private DoubleSumAggregator doubleSumAggregator;
    +
    +		public VertexUpdate(int maxIteration) {
    +			this.maxIteration = maxIteration;
    +		}
    +
    +		@Override
    +		public void preSuperstep() {
    +			doubleSumAggregator = getIterationAggregator("sumAllValue");
    +		}
    +
    +		@Override
    +		public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) {
    +			double updateValue = 0;
    +
    +			for (double element : inMessages) {
    +				if (getSuperstepNumber() == maxIteration) {
    +					updateValue = element;
    +					break;
    +				}
    +				updateValue += element;
    +			}
    +
    +			if (getSuperstepNumber() != maxIteration) {
    +				setNewVertexValue(updateValue);
    +				doubleSumAggregator.aggregate(updateValue);
    +			} else {
    +				setNewVertexValue(vertex.getValue() / updateValue);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Distributes the value of a vertex among all neighbor vertices and sum all the
    +	 * value in every superstep.
    +	 */
    +	@SuppressWarnings("serial")
    +	public static final class MessageUpdate<K> extends MessagingFunction<K, Double, Double, Double> {
    +		private int maxIteration;
    +
    +		public MessageUpdate(int maxIteration) {
    +			this.maxIteration = maxIteration;
    +		}
    +
    +		@Override
    +		public void sendMessages(Vertex<K, Double> vertex) {
    +			for (Edge<K, Double> edge : getEdges()) {
    +				if (getSuperstepNumber() % 2 == 1) {
    --- End diff --
    
    The only difference between the if and else is the flipping of source and target, so this could be condensed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-221504600
  
    Thank you @gallenvara. I'll merge later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r64056511
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -273,4 +289,23 @@ public void sendMessages(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex) {
     			return initVertexValue;
     		}
     	}
    +
    +	public static class AuthorityEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, String> {
    --- End diff --
    
    I mean that they had a small difference in running time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r62514639
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.graph.utils.NullValueEdgeMapper;
    +import org.apache.flink.types.DoubleValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + *
    + * @see <a href="https://en.wikipedia.org/wiki/HITS_algorithm">HITS Algorithm</a>
    + */
    +public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<Double, Double>>>> {
    +
    +	private int maxIterations;
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 */
    +	public HITSAlgorithm(int maxIterations) {
    +		Preconditions.checkArgument(maxIterations > 0, "The number of maximum iteration should be greater than 0.");
    +		this.maxIterations = maxIterations * 2 + 1;
    +	}
    +
    +	@Override
    +	public DataSet<Vertex<K, Tuple2<Double, Double>>> run(Graph<K, VV, EV> netGraph) throws Exception {
    +
    +		ScatterGatherConfiguration parameter = new ScatterGatherConfiguration();
    +		parameter.setDirection(EdgeDirection.ALL);
    +		parameter.registerAggregator("sumVertexValue", new DoubleSumAggregator());
    +
    +		return netGraph
    +				.mapVertices(new VertexInitMapper<K, VV>())
    +				.mapEdges(new NullValueEdgeMapper<K, EV>())
    +				.runScatterGatherIteration(new VertexUpdate<K>(maxIterations),
    +						new MessageUpdate<K, NullValue>(maxIterations), maxIterations, parameter)
    +				.getVertices();
    +	}
    +
    +	/**
    +	 * Function that updates the value of a vertex by summing up the partial
    +	 * values from all messages and normalize the value.
    +	 */
    +	@SuppressWarnings("serial")
    +	public static final class VertexUpdate<K> extends VertexUpdateFunction<K, Tuple2<Double, Double>, Double> {
    +		private int maxIteration;
    +		private DoubleSumAggregator doubleSumAggregator;
    +
    +		public VertexUpdate(int maxIteration) {
    +			this.maxIteration = maxIteration;
    +		}
    +
    +		@Override
    +		public void preSuperstep() {
    +			doubleSumAggregator = getIterationAggregator("sumVertexValue");
    +		}
    +
    +		@Override
    +		public void updateVertex(Vertex<K, Tuple2<Double, Double>> vertex, MessageIterator<Double> inMessages) {
    +			double updateValue = 0;
    +
    +			for (double element : inMessages) {
    +				if (getSuperstepNumber() == maxIteration) {
    +					updateValue = element;
    +					break;
    +				}
    +				updateValue += element;
    +			}
    +
    +			// in the first iteration, no aggregation to call, init sum with value of vertex
    +			double iterationValueSum = 1.0;
    +
    +			if (getSuperstepNumber() > 1) {
    +				iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("sumVertexValue")).getValue());
    +			}
    +			if (getSuperstepNumber() != maxIteration) {
    +				if (getSuperstepNumber() % 2 == 1) {
    +					setNewVertexValue(new Tuple2<Double, Double>(vertex.getValue().f0 / iterationValueSum, updateValue));
    +					doubleSumAggregator.aggregate(Math.pow(updateValue, 2));
    +				} else {
    +					setNewVertexValue(new Tuple2<Double, Double>(updateValue, vertex.getValue().f1 / iterationValueSum));
    +					doubleSumAggregator.aggregate(Math.pow(updateValue, 2));
    +				}
    +			} else {
    +
    +				//final iteration to normalize hub score
    +				setNewVertexValue(new Tuple2<Double, Double>(vertex.getValue().f0 / iterationValueSum, vertex.getValue().f1));
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Distributes the value of a vertex among all neighbor vertices and sum all the
    +	 * value in every superstep.
    +	 */
    +	@SuppressWarnings("serial")
    +	public static final class MessageUpdate<K, EV> extends MessagingFunction<K, Tuple2<Double, Double>, Double, EV> {
    +		private int maxIteration;
    +
    +		public MessageUpdate(int maxIteration) {
    +			this.maxIteration = maxIteration;
    +		}
    +
    +		@Override
    +		public void sendMessages(Vertex<K, Tuple2<Double, Double>> vertex) {
    +
    +			// in the first iteration, no aggregation to call, init sum with value of vertex
    +			double iterationValueSum = 1.0;
    +
    +			if (getSuperstepNumber() > 1) {
    +				iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("sumVertexValue")).getValue());
    +			}
    +			for (Edge<K, EV> edge : getEdges()) {
    +				K messageSource = getSuperstepNumber() % 2 == 1 ? edge.getSource() : edge.getTarget();
    +				K messageTarget = getSuperstepNumber() % 2 == 1 ? edge.getTarget() : edge.getSource();
    +				Double messageValue = getSuperstepNumber() % 2 == 1 ? vertex.getValue().f0 : vertex.getValue().f1;
    +
    +				if (!messageTarget.equals(vertex.getId())) {
    +					if (getSuperstepNumber() != maxIteration) {
    +						sendMessageTo(messageTarget, messageValue / iterationValueSum);
    +
    +						// in order to make every vertex updated
    +						sendMessageTo(messageSource, 0.0);
    +					} else {
    +						sendMessageTo(messageSource, iterationValueSum);
    +					}
    +				}
    +			}
    +		}
    +	}
    +
    +	public static class VertexInitMapper<K, VV> implements MapFunction<Vertex<K, VV>, Tuple2<Double, Double>> {
    +
    +		private static final long serialVersionUID = 1L;
    +
    +		public Tuple2<Double, Double> map(Vertex<K, VV> value) {
    +			
    +			//init hub and authority value of each vertex
    +			return new Tuple2<Double, Double>(1.0, 1.0);
    --- End diff --
    
    Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1956


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r62518520
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.graph.utils.NullValueEdgeMapper;
    +import org.apache.flink.types.DoubleValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + *
    + * @see <a href="https://en.wikipedia.org/wiki/HITS_algorithm">HITS Algorithm</a>
    + */
    +public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<Double, Double>>>> {
    --- End diff --
    
    When object reuse is enabled `DoubleValue` will deserialize into the same object (or among a few objects) whereas `Double` will be deserialized into new `Double` objects. Compare the `deserialize(T, DataInputView)` methods of `DoubleSerializer` and `DoubleValueSerializer`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61925324
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K, EV> implements GraphAlgorithm<K, Double, EV, DataSet<Vertex<K, Double>>> {
    +
    +	public static enum HITSParameter {
    +		HUB,
    +		AUTHORITY
    +	}
    +
    +	private int maxIterations;
    +	private long numberOfVertices;
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is known,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, long, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, HITSParameter hitsParameter) {
    +		Preconditions.checkArgument(maxIterations > 0, "The number of maximum iteration should be greater than 0.");
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +	}
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is unknown,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, long numberOfVertices, HITSParameter hitsParameter) {
    +		this(maxIterations, hitsParameter);
    +		Preconditions.checkArgument(numberOfVertices > 0, "The number of vertices in graph should be greater than 0.");
    +		this.numberOfVertices = numberOfVertices;
    +	}
    +
    +	@Override
    +	public DataSet<Vertex<K, Double>> run(Graph<K, Double, EV> netGraph) throws Exception {
    +		if (this.numberOfVertices == 0) {
    +			this.numberOfVertices = netGraph.numberOfVertices();
    --- End diff --
    
    Where do we use the number of vertices?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r63765556
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -122,22 +179,45 @@ public void updateVertex(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex, Mes
     			DoubleValue newAuthorityValue = vertex.getValue().f1;
     
     			if (getSuperstepNumber() > 1) {
    -				iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("sumVertexValue")).getValue());
    +				iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("updatedValueSum")).getValue());
     			}
    -			if (getSuperstepNumber() != maxIteration) {
    +			if (getSuperstepNumber() < maxIteration) {
     				if (getSuperstepNumber() % 2 == 1) {
    -					newHubValue.setValue(newHubValue.getValue() / iterationValueSum);
    -					newAuthorityValue.setValue(updateValue);
    +
    +					//in the first iteration, the diff is the authority value of each vertex
    +					double previousAuthAverage = 1.0;
    +					double diffValueSum = 1.0 * numberOfVertices;
    +					if (getSuperstepNumber() > 1) {
    +						previousAuthAverage = ((DoubleValue) getPreviousIterationAggregate("authorityValueSum")).getValue() / numberOfVertices;
    +						diffValueSum = ((DoubleValue) getPreviousIterationAggregate("diffValueSum")).getValue();
    +					}
    +					authoritySumAggregator.aggregate(previousAuthAverage);
    +					
    +					if (diffValueSum > convergeThreshold) {
    +						newHubValue.setValue(newHubValue.getValue() / iterationValueSum);
    +						newAuthorityValue.setValue(updateValue);
    +					} else {
    +
    +						//scores are converged and stop iteration
    +						maxIteration = getSuperstepNumber();
    --- End diff --
    
    I don't think this assignment has any effect?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r62868637
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.graph.utils.NullValueEdgeMapper;
    +import org.apache.flink.types.DoubleValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represents a page that points to many other pages, and a good authority
    + * represented a page that is linked by many different hubs.
    + * Each vertex has a value of Tuple2 type, the first field is hub score and the second field is authority score.
    + * The implementation assumes that the two score are the same in each vertex at the beginning.
    + * <p>
    + *
    + * @see <a href="https://en.wikipedia.org/wiki/HITS_algorithm">HITS Algorithm</a>
    + */
    +public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<Double, Double>>>> {
    +
    +	private int maxIterations;
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 */
    +	public HITSAlgorithm(int maxIterations) {
    +		Preconditions.checkArgument(maxIterations > 0, "The number of maximum iteration should be greater than 0.");
    +		this.maxIterations = maxIterations * 2 + 1;
    +	}
    +
    +	@Override
    +	public DataSet<Vertex<K, Tuple2<Double, Double>>> run(Graph<K, VV, EV> netGraph) throws Exception {
    +
    +		ScatterGatherConfiguration parameter = new ScatterGatherConfiguration();
    +		parameter.setDirection(EdgeDirection.ALL);
    +		parameter.registerAggregator("sumVertexValue", new DoubleSumAggregator());
    +
    +		return netGraph
    +				.mapVertices(new VertexInitMapper<K, VV>())
    +				.mapEdges(new NullValueEdgeMapper<K, EV>())
    --- End diff --
    
    Yes, and i will modify the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61780238
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    +
    +	public static enum HITSParameter {
    +		HUB,
    +		AUTHORITY
    +	}
    +
    +	private int maxIterations;
    +	private long numberOfVertices;
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is known,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, long, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +	}
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is unknown,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, long numberOfVertices, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +		this.numberOfVertices = numberOfVertices;
    +	}
    +
    +	@Override
    +	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> netGraph) throws Exception {
    +		if (this.numberOfVertices == 0) {
    +			this.numberOfVertices = netGraph.numberOfVertices();
    +		}
    +
    +		ScatterGatherConfiguration parameter = new ScatterGatherConfiguration();
    +		parameter.setDirection(EdgeDirection.ALL);
    +		parameter.registerAggregator("sumAllValue", new DoubleSumAggregator());
    +
    +		return netGraph.runScatterGatherIteration(new VertexUpdate<K>(maxIterations),
    +				new MessageUpdate<K>(maxIterations), maxIterations, parameter).getVertices();
    +	}
    +
    +	/**
    +	 * Function that updates the value of a vertex by summing up the partial
    +	 * values from all messages and normalize the value.
    +	 */
    +	@SuppressWarnings("serial")
    +	public static final class VertexUpdate<K> extends VertexUpdateFunction<K, Double, Double> {
    +		private int maxIteration;
    +		private DoubleSumAggregator doubleSumAggregator;
    +
    +		public VertexUpdate(int maxIteration) {
    +			this.maxIteration = maxIteration;
    +		}
    +
    +		@Override
    +		public void preSuperstep() {
    +			doubleSumAggregator = getIterationAggregator("sumAllValue");
    +		}
    +
    +		@Override
    +		public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) {
    +			double updateValue = 0;
    +
    +			for (double element : inMessages) {
    +				if (getSuperstepNumber() == maxIteration) {
    +					updateValue = element;
    +					break;
    +				}
    +				updateValue += element;
    +			}
    +
    +			if (getSuperstepNumber() != maxIteration) {
    +				setNewVertexValue(updateValue);
    +				doubleSumAggregator.aggregate(updateValue);
    +			} else {
    +				setNewVertexValue(vertex.getValue() / updateValue);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Distributes the value of a vertex among all neighbor vertices and sum all the
    +	 * value in every superstep.
    +	 */
    +	@SuppressWarnings("serial")
    +	public static final class MessageUpdate<K> extends MessagingFunction<K, Double, Double, Double> {
    +		private int maxIteration;
    +
    +		public MessageUpdate(int maxIteration) {
    +			this.maxIteration = maxIteration;
    +		}
    +
    +		@Override
    +		public void sendMessages(Vertex<K, Double> vertex) {
    +			for (Edge<K, Double> edge : getEdges()) {
    +				if (getSuperstepNumber() % 2 == 1) {
    +					if (edge.getTarget() != vertex.getId()) {
    +						double value = vertex.getValue();
    +
    +						// in the first iteration, no aggregation to call, init sum with value of vertex
    +						double sum = 1;
    +						if (getSuperstepNumber() > 1) {
    +							sum = ((DoubleValue) getPreviousIterationAggregate("sumAllValue")).getValue();
    --- End diff --
    
    We could move this out of the loop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-216433392
  
    @greghogan @vasia  thanks a lot for your review and codes have been modified. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r62863557
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.graph.utils.NullValueEdgeMapper;
    +import org.apache.flink.types.DoubleValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represents a page that points to many other pages, and a good authority
    + * represented a page that is linked by many different hubs.
    + * Each vertex has a value of Tuple2 type, the first field is hub score and the second field is authority score.
    + * The implementation assumes that the two score are the same in each vertex at the beginning.
    + * <p>
    + *
    + * @see <a href="https://en.wikipedia.org/wiki/HITS_algorithm">HITS Algorithm</a>
    + */
    +public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<Double, Double>>>> {
    +
    +	private int maxIterations;
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 */
    +	public HITSAlgorithm(int maxIterations) {
    +		Preconditions.checkArgument(maxIterations > 0, "The number of maximum iteration should be greater than 0.");
    +		this.maxIterations = maxIterations * 2 + 1;
    +	}
    +
    +	@Override
    +	public DataSet<Vertex<K, Tuple2<Double, Double>>> run(Graph<K, VV, EV> netGraph) throws Exception {
    --- End diff --
    
    Change `Double` to `DoubleValue`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r64054001
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -273,4 +289,23 @@ public void sendMessages(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex) {
     			return initVertexValue;
     		}
     	}
    +
    +	public static class AuthorityEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, String> {
    --- End diff --
    
    Ah when I proposed to label the edges as "authority" or "hub" I didn't really mean to add a `String` label :)
    We can do this with a boolean. @greghogan what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r62514632
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.graph.utils.NullValueEdgeMapper;
    +import org.apache.flink.types.DoubleValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + *
    + * @see <a href="https://en.wikipedia.org/wiki/HITS_algorithm">HITS Algorithm</a>
    + */
    +public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<Double, Double>>>> {
    --- End diff --
    
    I don't quite understand why DoubleValue can result in many fewer object instantiations in this implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-216961955
  
    @vasia likely has additional opinions and insight for the following ...
    
    1) The algorithm should return both the hub score and authority score. This requires unrolling an additional half-step after the iteration concludes. If we start by computing authority, then alternatively compute hub and authority in the iteration (such that authority is the iteration output), then we need to do one further computation of hub which can be outer-joined with the authority.
    
    2) Would this be better as a GSA algorithm which would use a combiner to reduce the scores? [https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/gelly.html#iteration-abstractions-comparison]
    
    3) I don't see that the input vertex values are used. If these require an initial type or value (is this mandated by the scatter-gather API?) then we can parameterize the algorithm and translate the vertices to the proper type and/or value using `Graph.translateVertexValues`.
    
    4) Same for edge values, which can be translated to `NullValue`.
    
    5) I'm assuming we can use a convergence threshold.
    
    6) From what I have read the normalization is performed by dividing by root-sum-square.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-217419465
  
    Hi @gallenvara, @greghogan,
    
    - If it's possible to return both the hub and the authority value, I'd prefer that.
    
    - GSA iterations allow setting the edge direction as Greg suggested. I'm not sure how much of a difference the combiner would make. Also, we've seen that for some graphs scatter-gather performs better. Personally, I would be fine with a first scatter-gather version of the algorithm. We can run some tests to see whether GSA would be faster later.
    
    - I agree that edge values should be internally set to `NullValue` if not used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-217527593
  
    Hi, @vasia @greghogan ,
    - code modified and support to return both hub and authority score as `Tuple2` type now. The implementation will run extra one iteration to normalize the hub value in the end. With scatter-gather being implemented, the GSA version is easy to write.
    - As for the threshold, the algorithm should check neighboring hub or authority iteration whether there are vertex updating. It's a little difficult and from my view, the` maxIteration` parameter play a similar role with threshold. 
    - I will open another issue for GSA version and relevant tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r63856640
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -122,22 +179,45 @@ public void updateVertex(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex, Mes
     			DoubleValue newAuthorityValue = vertex.getValue().f1;
     
     			if (getSuperstepNumber() > 1) {
    -				iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("sumVertexValue")).getValue());
    +				iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("updatedValueSum")).getValue());
     			}
    -			if (getSuperstepNumber() != maxIteration) {
    +			if (getSuperstepNumber() < maxIteration) {
     				if (getSuperstepNumber() % 2 == 1) {
    -					newHubValue.setValue(newHubValue.getValue() / iterationValueSum);
    -					newAuthorityValue.setValue(updateValue);
    +
    +					//in the first iteration, the diff is the authority value of each vertex
    +					double previousAuthAverage = 1.0;
    +					double diffValueSum = 1.0 * numberOfVertices;
    +					if (getSuperstepNumber() > 1) {
    +						previousAuthAverage = ((DoubleValue) getPreviousIterationAggregate("authorityValueSum")).getValue() / numberOfVertices;
    +						diffValueSum = ((DoubleValue) getPreviousIterationAggregate("diffValueSum")).getValue();
    +					}
    +					authoritySumAggregator.aggregate(previousAuthAverage);
    +					
    +					if (diffValueSum > convergeThreshold) {
    +						newHubValue.setValue(newHubValue.getValue() / iterationValueSum);
    +						newAuthorityValue.setValue(updateValue);
    +					} else {
    +
    +						//scores are converged and stop iteration
    +						maxIteration = getSuperstepNumber();
    --- End diff --
    
    This line can stop the iteration after last vertex updating(final updating of hub normalization). If drop this line, the iteration will go on until `getSuperstep == maxIteration` because there are always some vertices can be updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r62493213
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.graph.utils.NullValueEdgeMapper;
    +import org.apache.flink.types.DoubleValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + *
    + * @see <a href="https://en.wikipedia.org/wiki/HITS_algorithm">HITS Algorithm</a>
    + */
    +public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<Double, Double>>>> {
    +
    +	private int maxIterations;
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 */
    +	public HITSAlgorithm(int maxIterations) {
    +		Preconditions.checkArgument(maxIterations > 0, "The number of maximum iteration should be greater than 0.");
    +		this.maxIterations = maxIterations * 2 + 1;
    +	}
    +
    +	@Override
    +	public DataSet<Vertex<K, Tuple2<Double, Double>>> run(Graph<K, VV, EV> netGraph) throws Exception {
    +
    +		ScatterGatherConfiguration parameter = new ScatterGatherConfiguration();
    +		parameter.setDirection(EdgeDirection.ALL);
    +		parameter.registerAggregator("sumVertexValue", new DoubleSumAggregator());
    +
    +		return netGraph
    +				.mapVertices(new VertexInitMapper<K, VV>())
    +				.mapEdges(new NullValueEdgeMapper<K, EV>())
    +				.runScatterGatherIteration(new VertexUpdate<K>(maxIterations),
    +						new MessageUpdate<K, NullValue>(maxIterations), maxIterations, parameter)
    +				.getVertices();
    +	}
    +
    +	/**
    +	 * Function that updates the value of a vertex by summing up the partial
    +	 * values from all messages and normalize the value.
    +	 */
    +	@SuppressWarnings("serial")
    +	public static final class VertexUpdate<K> extends VertexUpdateFunction<K, Tuple2<Double, Double>, Double> {
    +		private int maxIteration;
    +		private DoubleSumAggregator doubleSumAggregator;
    +
    +		public VertexUpdate(int maxIteration) {
    +			this.maxIteration = maxIteration;
    +		}
    +
    +		@Override
    +		public void preSuperstep() {
    +			doubleSumAggregator = getIterationAggregator("sumVertexValue");
    +		}
    +
    +		@Override
    +		public void updateVertex(Vertex<K, Tuple2<Double, Double>> vertex, MessageIterator<Double> inMessages) {
    +			double updateValue = 0;
    +
    +			for (double element : inMessages) {
    +				if (getSuperstepNumber() == maxIteration) {
    +					updateValue = element;
    +					break;
    +				}
    +				updateValue += element;
    +			}
    +
    +			// in the first iteration, no aggregation to call, init sum with value of vertex
    +			double iterationValueSum = 1.0;
    +
    +			if (getSuperstepNumber() > 1) {
    +				iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("sumVertexValue")).getValue());
    +			}
    +			if (getSuperstepNumber() != maxIteration) {
    +				if (getSuperstepNumber() % 2 == 1) {
    +					setNewVertexValue(new Tuple2<Double, Double>(vertex.getValue().f0 / iterationValueSum, updateValue));
    +					doubleSumAggregator.aggregate(Math.pow(updateValue, 2));
    +				} else {
    +					setNewVertexValue(new Tuple2<Double, Double>(updateValue, vertex.getValue().f1 / iterationValueSum));
    +					doubleSumAggregator.aggregate(Math.pow(updateValue, 2));
    +				}
    +			} else {
    +
    +				//final iteration to normalize hub score
    +				setNewVertexValue(new Tuple2<Double, Double>(vertex.getValue().f0 / iterationValueSum, vertex.getValue().f1));
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Distributes the value of a vertex among all neighbor vertices and sum all the
    +	 * value in every superstep.
    +	 */
    +	@SuppressWarnings("serial")
    +	public static final class MessageUpdate<K, EV> extends MessagingFunction<K, Tuple2<Double, Double>, Double, EV> {
    +		private int maxIteration;
    +
    +		public MessageUpdate(int maxIteration) {
    +			this.maxIteration = maxIteration;
    +		}
    +
    +		@Override
    +		public void sendMessages(Vertex<K, Tuple2<Double, Double>> vertex) {
    +
    +			// in the first iteration, no aggregation to call, init sum with value of vertex
    +			double iterationValueSum = 1.0;
    +
    +			if (getSuperstepNumber() > 1) {
    +				iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("sumVertexValue")).getValue());
    +			}
    +			for (Edge<K, EV> edge : getEdges()) {
    +				K messageSource = getSuperstepNumber() % 2 == 1 ? edge.getSource() : edge.getTarget();
    +				K messageTarget = getSuperstepNumber() % 2 == 1 ? edge.getTarget() : edge.getSource();
    +				Double messageValue = getSuperstepNumber() % 2 == 1 ? vertex.getValue().f0 : vertex.getValue().f1;
    +
    +				if (!messageTarget.equals(vertex.getId())) {
    +					if (getSuperstepNumber() != maxIteration) {
    +						sendMessageTo(messageTarget, messageValue / iterationValueSum);
    +
    +						// in order to make every vertex updated
    +						sendMessageTo(messageSource, 0.0);
    +					} else {
    +						sendMessageTo(messageSource, iterationValueSum);
    +					}
    +				}
    +			}
    +		}
    +	}
    +
    +	public static class VertexInitMapper<K, VV> implements MapFunction<Vertex<K, VV>, Tuple2<Double, Double>> {
    +
    +		private static final long serialVersionUID = 1L;
    +
    +		public Tuple2<Double, Double> map(Vertex<K, VV> value) {
    +			
    +			//init hub and authority value of each vertex
    +			return new Tuple2<Double, Double>(1.0, 1.0);
    --- End diff --
    
    This object can be created as a member variable so that only a single object is instantiated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-221616393
  
    @vasia In the travis failure reports, the failures are relevant with flink-yarn-tests module. I have merged the latest code from master and rebase all my commit in this PR. And the `HITSAlgorithmITCase` runs successfully in local.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61986359
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K, EV> implements GraphAlgorithm<K, Double, EV, DataSet<Vertex<K, Double>>> {
    +
    +	public static enum HITSParameter {
    +		HUB,
    +		AUTHORITY
    +	}
    +
    +	private int maxIterations;
    +	private long numberOfVertices;
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is known,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, long, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, HITSParameter hitsParameter) {
    +		Preconditions.checkArgument(maxIterations > 0, "The number of maximum iteration should be greater than 0.");
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +	}
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is unknown,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, long numberOfVertices, HITSParameter hitsParameter) {
    +		this(maxIterations, hitsParameter);
    +		Preconditions.checkArgument(numberOfVertices > 0, "The number of vertices in graph should be greater than 0.");
    +		this.numberOfVertices = numberOfVertices;
    +	}
    +
    +	@Override
    +	public DataSet<Vertex<K, Double>> run(Graph<K, Double, EV> netGraph) throws Exception {
    +		if (this.numberOfVertices == 0) {
    +			this.numberOfVertices = netGraph.numberOfVertices();
    --- End diff --
    
    I'm sorry that i forgot to remove it. `Sum normalization` does not need the number of vertices, only necessary for `z-core normalization`. And i have submitted a new commit for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-216251223
  
    @vasia code modified and i add a extra iteration for getting the aggregated value of previous iteration to normalize all vertex. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r63765397
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -46,40 +46,89 @@
      * represented a page that is linked by many different hubs.
      * Each vertex has a value of Tuple2 type, the first field is hub score and the second field is authority score.
      * The implementation assumes that the two score are the same in each vertex at the beginning.
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
      * <p>
      *
      * @see <a href="https://en.wikipedia.org/wiki/HITS_algorithm">HITS Algorithm</a>
      */
     public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<DoubleValue, DoubleValue>>>> {
     
    +	private final static int MAXIMUMITERATION = (Integer.MAX_VALUE - 1) / 2;
    +	private final static double MINIMUMTHRESHOLD = 1e-9;
    +
     	private int maxIterations;
    +	private long numberOfVertices;
    +	private double convergeThreshold;
    +
    +	public HITSAlgorithm(int maxIterations) {
    +		this(maxIterations, MINIMUMTHRESHOLD);
    +	}
    +
    +	public HITSAlgorithm(double convergeThreshold) {
    +		this(MAXIMUMITERATION, convergeThreshold);
    +	}
    +
    +	public HITSAlgorithm(int maxIterations, long numberOfVertices) {
    +		this(maxIterations, MINIMUMTHRESHOLD, numberOfVertices);
    +	}
    +
    +	public HITSAlgorithm(double convergeThreshold, long numberOfVertices) {
    +		this(MAXIMUMITERATION, convergeThreshold, numberOfVertices);
    +	}
     
     	/**
     	 * Creates an instance of HITS algorithm.
     	 *
    -	 * @param maxIterations the maximum number of iterations
    +	 * @param maxIterations     the maximum number of iterations
    +	 * @param convergeThreshold convergence threshold for sum of scores
    --- End diff --
    
    Can you extend the description of the convergenceThreshold argument a bit? "Sum of scores" is not very clear imo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r63354317
  
    --- Diff: flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.examples.data.HITSData;
    +import org.apache.flink.test.util.MultipleProgramsTestBase;
    +import org.apache.flink.types.DoubleValue;
    +import org.apache.flink.types.NullValue;
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +
    +@RunWith(Parameterized.class)
    +public class HITSAlgorithmITCase extends MultipleProgramsTestBase{
    --- End diff --
    
    In the `MultipleProgramsTestBase`, the default test mode is `COLLECTION`. Should we specify the mode manually?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r62863450
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.graph.utils.NullValueEdgeMapper;
    +import org.apache.flink.types.DoubleValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represents a page that points to many other pages, and a good authority
    + * represented a page that is linked by many different hubs.
    + * Each vertex has a value of Tuple2 type, the first field is hub score and the second field is authority score.
    + * The implementation assumes that the two score are the same in each vertex at the beginning.
    + * <p>
    + *
    + * @see <a href="https://en.wikipedia.org/wiki/HITS_algorithm">HITS Algorithm</a>
    + */
    +public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<Double, Double>>>> {
    +
    +	private int maxIterations;
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 */
    +	public HITSAlgorithm(int maxIterations) {
    +		Preconditions.checkArgument(maxIterations > 0, "The number of maximum iteration should be greater than 0.");
    +		this.maxIterations = maxIterations * 2 + 1;
    +	}
    +
    +	@Override
    +	public DataSet<Vertex<K, Tuple2<Double, Double>>> run(Graph<K, VV, EV> netGraph) throws Exception {
    +
    +		ScatterGatherConfiguration parameter = new ScatterGatherConfiguration();
    +		parameter.setDirection(EdgeDirection.ALL);
    +		parameter.registerAggregator("sumVertexValue", new DoubleSumAggregator());
    +
    +		return netGraph
    +				.mapVertices(new VertexInitMapper<K, VV>())
    +				.mapEdges(new NullValueEdgeMapper<K, EV>())
    --- End diff --
    
    Mapping to `NullValue` is not necessary if the edge values are already `NullValue`. Based on the code in `Translate`, we can `TypeInformation<EV> typeInfo = ((TupleTypeInfo<Edge<K, EV>>) netGraph.getEdges().getType()).getTypeAt(2);` and then `if (typeInfo.getTypeClass().equals(NullValue.class)) ...`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r64056167
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -243,18 +255,22 @@ public void sendMessages(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex) {
     				iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("updatedValueSum")).getValue());
     			}
     			for (Edge<K, EV> edge : getEdges()) {
    -				K messageSource = getSuperstepNumber() % 2 == 1 ? edge.getSource() : edge.getTarget();
    -				K messageTarget = getSuperstepNumber() % 2 == 1 ? edge.getTarget() : edge.getSource();
    -				double messageValue = getSuperstepNumber() % 2 == 1 ? vertex.getValue().f0.getValue() : vertex.getValue().f1.getValue();
    -
    -				if (!messageTarget.equals(vertex.getId())) {
    -					if (getSuperstepNumber() != maxIteration) {
    -						sendMessageTo(messageTarget, messageValue / iterationValueSum);
    -
    -						// in order to make every vertex updated
    -						sendMessageTo(messageSource, 0.0);
    +				if (getSuperstepNumber() != maxIteration) {
    +					if (getSuperstepNumber() % 2 == 1) {
    +						if (edge.getValue().equals("Authority")) {
    +							sendMessageTo(edge.getTarget(), vertex.getValue().f0.getValue() / iterationValueSum);
    +						}
     					} else {
    -						sendMessageTo(messageSource, iterationValueSum);
    +						if (edge.getValue().equals("Hub")) {
    +							sendMessageTo(edge.getTarget(), vertex.getValue().f1.getValue() / iterationValueSum);
    +						}
    +					}
    +					
    +					// make all the vertices be updated
    +					sendMessageTo(edge.getSource(), 0.0);
    --- End diff --
    
    Fixed with rebasing the previous commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-216222300
  
    Hi @gallenvara,
    
    thank you for the PR! I only took a quick look, but I will go through the changes in detail soon.
    One thing I noticed is that you use static variables for performing the normalization. This won't work in a distributed environment. You will need to use aggregators instead. Take a look at `IterationConfiguration.registerAggregator`. `VertexUpdateFunction` and `MessagingFunction` have methods to retrieve an aggregator and an aggregated value from the previous superstep. Let me know if you need help :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61865225
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    --- End diff --
    
    The edge value is not used throughout the process. It would be better to set to `NullValue` as hard code, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r64051693
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -243,18 +255,22 @@ public void sendMessages(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex) {
     				iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("updatedValueSum")).getValue());
     			}
     			for (Edge<K, EV> edge : getEdges()) {
    -				K messageSource = getSuperstepNumber() % 2 == 1 ? edge.getSource() : edge.getTarget();
    -				K messageTarget = getSuperstepNumber() % 2 == 1 ? edge.getTarget() : edge.getSource();
    -				double messageValue = getSuperstepNumber() % 2 == 1 ? vertex.getValue().f0.getValue() : vertex.getValue().f1.getValue();
    -
    -				if (!messageTarget.equals(vertex.getId())) {
    -					if (getSuperstepNumber() != maxIteration) {
    -						sendMessageTo(messageTarget, messageValue / iterationValueSum);
    -
    -						// in order to make every vertex updated
    -						sendMessageTo(messageSource, 0.0);
    +				if (getSuperstepNumber() != maxIteration) {
    +					if (getSuperstepNumber() % 2 == 1) {
    +						if (edge.getValue().equals("Authority")) {
    +							sendMessageTo(edge.getTarget(), vertex.getValue().f0.getValue() / iterationValueSum);
    +						}
     					} else {
    -						sendMessageTo(messageSource, iterationValueSum);
    +						if (edge.getValue().equals("Hub")) {
    +							sendMessageTo(edge.getTarget(), vertex.getValue().f1.getValue() / iterationValueSum);
    +						}
    +					}
    +					
    +					// make all the vertices be updated
    +					sendMessageTo(edge.getSource(), 0.0);
    --- End diff --
    
    Why do we need to send this message?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r63346546
  
    --- Diff: flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/HITSData.java ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.examples.data;
    +
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.types.NullValue;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Provides the data set used for the HITS test program.
    + */
    +public class HITSData {
    +
    +	public static final String VALUE_AFTER_3_ITERATIONS = "1,0.707,0.007\n" +
    +															"2,0.003,0.707\n" +
    +															"3,0.003,0.500\n" +
    +															"4,0.500,0.500\n" +
    +															"5,0.500,0.007\n";
    +
    +
    +	private HITSData() {}
    +
    +	public static final DataSet<Vertex<Long, Double>> getVertexDataSet(ExecutionEnvironment env) {
    +
    +		List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>();
    --- End diff --
    
     Can use the diamond operator here, `new ArrayList<>();`. IntelliJ is generally accurate in detecting unnecessary code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-221637781
  
    Thanks @gallenvara! All tests pass now. I will merge :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-218446355
  
    Thanks for the updates @gallenvara. I left a few minor comments. Could you please also add the algorithm in the Gelly documentation under "library methods"? It should be good to merge after that :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61764033
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    +
    +	public static enum HITSParameter {
    +		HUB,
    +		AUTHORITY
    +	}
    +
    +	private int maxIterations;
    +	private long numberOfVertices;
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is known,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, long, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +	}
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is unknown,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, long numberOfVertices, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    --- End diff --
    
    Can replace 82:87 with `super(maxIterations, hitsParameter);`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r64056076
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -273,4 +289,23 @@ public void sendMessages(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex) {
     			return initVertexValue;
     		}
     	}
    +
    +	public static class AuthorityEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, String> {
    --- End diff --
    
    Before committing the newest codes, i have test the edge with `String` label and `boolean` label. And i found that `boolean` label is not fast than the `String` label. So finally i used the "hub" & "authority", also it's very intuitionistic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-218543446
  
    @greghogan thanks and relevant codes have been modified. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r63765321
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -46,40 +46,89 @@
      * represented a page that is linked by many different hubs.
      * Each vertex has a value of Tuple2 type, the first field is hub score and the second field is authority score.
      * The implementation assumes that the two score are the same in each vertex at the beginning.
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
      * <p>
      *
      * @see <a href="https://en.wikipedia.org/wiki/HITS_algorithm">HITS Algorithm</a>
      */
     public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<DoubleValue, DoubleValue>>>> {
     
    +	private final static int MAXIMUMITERATION = (Integer.MAX_VALUE - 1) / 2;
    +	private final static double MINIMUMTHRESHOLD = 1e-9;
    +
     	private int maxIterations;
    +	private long numberOfVertices;
    +	private double convergeThreshold;
    +
    +	public HITSAlgorithm(int maxIterations) {
    --- End diff --
    
    This and the following 3 constructors are missing Javadocs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-217070480
  
    Thanks a lot, @greghogan @vasia .
    My limited understand on the tips your have mentioned :
    1) The PR implemented HITS by dividing hub&authority updating into two phases. Value updating and normalization limit the two phased can not be handled in the same `superstep`. IMO, we can cache the hub updating result and send them to next authority iteration and package final authority and hub value as `Tuple2` type to return. What's your opinion on this?
    2) GSA does not support for choosing edge direction and each vertex will be updated based on the values of its in-neighbors only. In the implementation, hub updating use the value of target vertex where the edge direction is out, authority updating used the value of the value of source vertex where the edge direction is in. IMO, it does not work for hub updating procedure if used GSA.
    3)Yes, vertices have been initialized in the test. It should be better to be set into the algortithm before the first iteration using `Graph.translateVertexValues`.
    4) Because the edge value not used, the translation is optional and it can keep its original value and type.
    5) Yes, adding threshold may reduce iteration time for the case of small graph with great `maxIteration`. (Does the scatter-gather or GSA have a default threshold to check for no value updating during the iteration?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-221603967
  
    The failures were in the `HITSAlgorithmITCase`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-221603165
  
    @vasia in which module were the failures? master has been quite unstable recently now that tests are properly failing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61766847
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    +
    +	public static enum HITSParameter {
    +		HUB,
    +		AUTHORITY
    +	}
    +
    +	private int maxIterations;
    +	private long numberOfVertices;
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is known,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, long, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +	}
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is unknown,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, long numberOfVertices, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +		this.numberOfVertices = numberOfVertices;
    --- End diff --
    
    Thanks for review :) i will modify codes tomorrow because my computer is not by my side now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61868075
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    --- End diff --
    
    Done!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-216227985
  
    I will have a try. If `iteration==1` or `iteration==maxIteration` maybe a little difficult to deal.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-216226071
  
    There is no way to get the aggregated value of the current superstep, as the aggregation happens at the superstep barrier. What you could do is normalize the vertex value in the `MessagingFunction` of the next superstep. E.g. say in superstep `i` you need to set each vertex value to `v / sum(i)`. You can instead set it to `v` and then propagate `v / sum(i)` in the scatter phase of superstep `i+1`. Would that work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61780580
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    --- End diff --
    
    Is the edge value used in this algorithm?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-217142445
  
    1) Yes, that sounds right. The update function can choose which of Hub or Authority to set.
    
    2) Does `GSAConfiguration.setDirection` work the same as in scatter-gather?
    
    4) The algorithm will be faster if you first set the edge values to `NullValue`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r64070513
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -273,4 +289,23 @@ public void sendMessages(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex) {
     			return initVertexValue;
     		}
     	}
    +
    +	public static class AuthorityEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, String> {
    --- End diff --
    
    A boolean is stored as a byte as would a bitmask which allows the edge set to be compressed on bidirectional edges. Not sure how common bidirectional edges are in real-world datasets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r63346119
  
    --- Diff: flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.examples.data.HITSData;
    +import org.apache.flink.test.util.MultipleProgramsTestBase;
    +import org.apache.flink.types.DoubleValue;
    +import org.apache.flink.types.NullValue;
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +
    +@RunWith(Parameterized.class)
    +public class HITSAlgorithmITCase extends MultipleProgramsTestBase{
    --- End diff --
    
    Should all library and example algorithms be using `CollectionEnvironment`? (`env = ExecutionEnvironment.createCollectionsEnvironment();`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r64144375
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -273,4 +289,23 @@ public void sendMessages(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex) {
     			return initVertexValue;
     		}
     	}
    +
    +	public static class AuthorityEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, String> {
    --- End diff --
    
    @greghogan  @vasia In the use of HITS algorithm, bidirectional edges is not very common, so I do not have bidirectional edge determination logic written into the code and all of the edges have joined their reverse edges.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-218467237
  
    @vasia Thanks a lot and PR has been updated as you advised.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61864334
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    --- End diff --
    
    Haven't looked at your latest commit, but you can parameterize with "EV" as you have with "K".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r63347071
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,276 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.graph.utils.NullValueEdgeMapper;
    +import org.apache.flink.types.DoubleValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represents a page that points to many other pages, and a good authority
    + * represented a page that is linked by many different hubs.
    + * Each vertex has a value of Tuple2 type, the first field is hub score and the second field is authority score.
    + * The implementation assumes that the two score are the same in each vertex at the beginning.
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + * <p>
    + *
    + * @see <a href="https://en.wikipedia.org/wiki/HITS_algorithm">HITS Algorithm</a>
    + */
    +public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<DoubleValue, DoubleValue>>>> {
    +
    +	private final static int MAXIMUMITERATION = (Integer.MAX_VALUE - 1) / 2;
    +	private final static double MINIMUMTHRESHOLD = 1e-9;
    +
    +	private int maxIterations;
    +	private long numberOfVertices;
    +	private double convergeThreshold;
    +
    +	public HITSAlgorithm(int maxIterations) {
    +		this(maxIterations, MINIMUMTHRESHOLD);
    +	}
    +
    +	public HITSAlgorithm(double convergeThreshold) {
    +		this(MAXIMUMITERATION, convergeThreshold);
    +	}
    +
    +	public HITSAlgorithm(int maxIterations, long numberOfVertices) {
    +		this(maxIterations, MINIMUMTHRESHOLD, numberOfVertices);
    +	}
    +
    +	public HITSAlgorithm(double convergeThreshold, long numberOfVertices) {
    +		this(MAXIMUMITERATION, convergeThreshold, numberOfVertices);
    +	}
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 *
    +	 * @param maxIterations     the maximum number of iterations
    +	 * @param convergeThreshold convergence threshold for sum of scores
    +	 */
    +	public HITSAlgorithm(int maxIterations, double convergeThreshold) {
    +		Preconditions.checkArgument(maxIterations > 0, "Number of iterations must be greater than zero.");
    +		Preconditions.checkArgument(convergeThreshold > 0.0, "Convergence threshold must be greater than zero.");
    +		this.maxIterations = maxIterations * 2 + 1;
    +		this.convergeThreshold = convergeThreshold;
    +	}
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 *
    +	 * @param maxIterations     the maximum number of iterations
    +	 * @param convergeThreshold convergence threshold for sum of scores
    +	 * @param numberOfVertices  the number of vertices in the graph
    +	 */
    +	public HITSAlgorithm(int maxIterations, double convergeThreshold, long numberOfVertices) {
    +		this(maxIterations, convergeThreshold);
    +		Preconditions.checkArgument(numberOfVertices > 0, "Number of vertices must be greater than zero.");
    +		this.numberOfVertices = numberOfVertices;
    +	}
    +
    +	@Override
    +	public DataSet<Vertex<K, Tuple2<DoubleValue, DoubleValue>>> run(Graph<K, VV, EV> netGraph) throws Exception {
    --- End diff --
    
    `netGraph` -> `graph`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r63353881
  
    --- Diff: flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/HITSData.java ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.examples.data;
    +
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.types.NullValue;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Provides the data set used for the HITS test program.
    + */
    +public class HITSData {
    +
    +	public static final String VALUE_AFTER_3_ITERATIONS = "1,0.707,0.007\n" +
    --- End diff --
    
    Yes, i used approximate number of the result.
    I would replace the number of iterations with a larger one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-220143133
  
    @vasia that is a very good idea. I've been doing this for `TriangleListing` for directed clustering coefficient and it should probably be a separate `GraphAlgorithm`. There is a bitmask for whether the edge is `u -> v`, `u <- v`, or `u <-> v`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61832489
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    +
    +	public static enum HITSParameter {
    +		HUB,
    +		AUTHORITY
    +	}
    +
    +	private int maxIterations;
    +	private long numberOfVertices;
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is known,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, long, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +	}
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is unknown,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, long numberOfVertices, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    --- End diff --
    
    may be `this(maxIterations, hitParameter);` :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-220628229
  
    Thank you for the update @gallenvara and for your patience with our continuous comments :)
    How many edges did the graphs in your experiment have?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-220134968
  
    Thanks for the update @gallenvara and @greghogan for the review.
    I have left a few comments in places where I couldn't figure out the code logic.
    
    There is one more thing I'd like to discuss with you. The current implementation is using `EdgeDirection.ALL`, but this feature is not very efficiently implemented. It actually performs 2 coGroups (one on the source Id and one on the target Id) and unions the outputs to create the result of the messaging function. Instead, I was thinking we could do the following: mark the inputs graph edges with an "authority" label and add opposite-direction edges with a "hub" authority. Then, we can use the default edge direction (IN) to perform the iteration. In even supersteps, each vertex sends messages only along "authority" edges and in odd supersteps only along "hub" edges. Does this make sense?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r63765714
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -160,7 +240,7 @@ public void sendMessages(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex) {
     			double iterationValueSum = 1.0;
     
     			if (getSuperstepNumber() > 1) {
    -				iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("sumVertexValue")).getValue());
    +				iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("updatedValueSum")).getValue());
     			}
     			for (Edge<K, EV> edge : getEdges()) {
     				K messageSource = getSuperstepNumber() % 2 == 1 ? edge.getSource() : edge.getTarget();
    --- End diff --
    
    Could you please explain what is this and the following line doing? The condition is the same in both.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-220147037
  
    I also like @vasia's idea since we might get to add the analytic `DyadicCensus` (basically counting how many edges are `u <-> v` and `u -> v`). The relative performance of this implementation will depend on the ratio of mutual edges. Are there one or more canonical data sets for running a comparison? The two extremes are an undirected graph and a graph with no mutual edges, which could be artificially constructed.
    
    Also, I recommend copying the current code into a new class to test a new implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61831064
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    --- End diff --
    
    not used, may be good to set to `NullValue`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61866752
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    --- End diff --
    
    yes, you are right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61866283
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    --- End diff --
    
    It can be parametrized with "EV" and the algorithm can set it to `NullValue` internally. This way, users won't have to first map their input graphs to `NullValue` edge value types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-220656004
  
    Stanford provides several old graph datasets at https://snap.stanford.edu/data/index.html which might prove a better standard for benchmarking.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-221394386
  
    Hey @gallenvara,
    I had a private chat with @greghogan about this PR. We think that we should change the label type to a boolean instead of string. It should make a difference for large graph inputs. After this last change we'll go ahead and finally merge this :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-219605318
  
    @greghogan thanks for your advice and relevant codes have been modified. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r63346245
  
    --- Diff: flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/HITSData.java ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.examples.data;
    +
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.types.NullValue;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Provides the data set used for the HITS test program.
    + */
    +public class HITSData {
    +
    +	public static final String VALUE_AFTER_3_ITERATIONS = "1,0.707,0.007\n" +
    --- End diff --
    
    Are we better off testing for convergence (or after a larger number of iterations) rather than testing an early state after a few iterations which seems particularly brittle?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-220292049
  
    Hi, @vasia @greghogan . Thanks a lot with your advice! I have modify the codes to support the iteration with `EdgeDirection.OUT` instead of `EdgeDirection.ALL` with edge label. And i write a test with big graph to compare the two implementations, the result is:
    ![hits_compare](https://cloud.githubusercontent.com/assets/12931563/15391189/627af152-1df2-11e6-9e43-33706bd00cdf.PNG)
    The number of edge is same and the relation between every two nodes is randomly added.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r63856679
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -46,40 +46,89 @@
      * represented a page that is linked by many different hubs.
      * Each vertex has a value of Tuple2 type, the first field is hub score and the second field is authority score.
      * The implementation assumes that the two score are the same in each vertex at the beginning.
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
      * <p>
      *
      * @see <a href="https://en.wikipedia.org/wiki/HITS_algorithm">HITS Algorithm</a>
      */
     public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<DoubleValue, DoubleValue>>>> {
     
    +	private final static int MAXIMUMITERATION = (Integer.MAX_VALUE - 1) / 2;
    +	private final static double MINIMUMTHRESHOLD = 1e-9;
    +
     	private int maxIterations;
    +	private long numberOfVertices;
    +	private double convergeThreshold;
    +
    +	public HITSAlgorithm(int maxIterations) {
    --- End diff --
    
    Done!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61764570
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    +
    +	public static enum HITSParameter {
    +		HUB,
    +		AUTHORITY
    +	}
    +
    +	private int maxIterations;
    +	private long numberOfVertices;
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is known,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, long, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +	}
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is unknown,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, long numberOfVertices, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +		this.numberOfVertices = numberOfVertices;
    --- End diff --
    
    Verify positive `numberOfVertices` with `Preconditions.checkArgument`. Same for `maxIterations` in other constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61772427
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    +
    +	public static enum HITSParameter {
    +		HUB,
    +		AUTHORITY
    +	}
    +
    +	private int maxIterations;
    +	private long numberOfVertices;
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is known,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, long, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +	}
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is unknown,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, long numberOfVertices, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +		this.numberOfVertices = numberOfVertices;
    +	}
    +
    +	@Override
    +	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> netGraph) throws Exception {
    --- End diff --
    
    The two phases are depended on each other. Hub can update until authority updated and normalized, also the same to authority. So the two updating processing is in a front and back order, i mean they are belong to different iteration step. Return a `tuple2` value means that we can get hub and authority in the same `superstep`. So the `HITSParameter` being set.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-220633090
  
    @vasia vertex num: 10000, edge num: 30000; vertex num: 30000, edge num: 100000.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-221600182
  
    Hey @gallenvara,
    I was about to merge this, but I see test failures after rebasing on top of master.
    Can you please (1) rebase on top of the latest master and squash your commits and (2) investigate what's wrong with the tests?
    Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61769569
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    +
    +	public static enum HITSParameter {
    +		HUB,
    +		AUTHORITY
    +	}
    +
    +	private int maxIterations;
    +	private long numberOfVertices;
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is known,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, long, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +	}
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is unknown,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, long numberOfVertices, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +		this.numberOfVertices = numberOfVertices;
    +	}
    +
    +	@Override
    +	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> netGraph) throws Exception {
    --- End diff --
    
    Could we return both the hub score and authority score in a `Tuple2` rather than having the user choose between the two scores?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-221606502
  
    @vasia wait for minutes and i will take a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r62838726
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.graph.utils.NullValueEdgeMapper;
    +import org.apache.flink.types.DoubleValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    --- End diff --
    
    represented => represents
    pointed => points
    was linked => is linked
    the two value => the two values*


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r62838798
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.graph.utils.NullValueEdgeMapper;
    +import org.apache.flink.types.DoubleValue;
    +import org.apache.flink.types.NullValue;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + *
    + * @see <a href="https://en.wikipedia.org/wiki/HITS_algorithm">HITS Algorithm</a>
    + */
    --- End diff --
    
    Can you also please add a comment about the result type? Which tuple field is the authority score and which is the hub?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61770727
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    +
    +	public static enum HITSParameter {
    +		HUB,
    +		AUTHORITY
    +	}
    +
    +	private int maxIterations;
    +	private long numberOfVertices;
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is known,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, long, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +	}
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is unknown,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, long numberOfVertices, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +		this.numberOfVertices = numberOfVertices;
    --- End diff --
    
    Thanks for the contribution :) Pull requests are always asynchronous.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61831147
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    +
    +	public static enum HITSParameter {
    +		HUB,
    +		AUTHORITY
    +	}
    +
    +	private int maxIterations;
    +	private long numberOfVertices;
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is known,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, long, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +	}
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is unknown,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, long numberOfVertices, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +		this.numberOfVertices = numberOfVertices;
    +	}
    +
    +	@Override
    +	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> netGraph) throws Exception {
    +		if (this.numberOfVertices == 0) {
    +			this.numberOfVertices = netGraph.numberOfVertices();
    +		}
    +
    +		ScatterGatherConfiguration parameter = new ScatterGatherConfiguration();
    +		parameter.setDirection(EdgeDirection.ALL);
    +		parameter.registerAggregator("sumAllValue", new DoubleSumAggregator());
    +
    +		return netGraph.runScatterGatherIteration(new VertexUpdate<K>(maxIterations),
    +				new MessageUpdate<K>(maxIterations), maxIterations, parameter).getVertices();
    +	}
    +
    +	/**
    +	 * Function that updates the value of a vertex by summing up the partial
    +	 * values from all messages and normalize the value.
    +	 */
    +	@SuppressWarnings("serial")
    +	public static final class VertexUpdate<K> extends VertexUpdateFunction<K, Double, Double> {
    +		private int maxIteration;
    +		private DoubleSumAggregator doubleSumAggregator;
    +
    +		public VertexUpdate(int maxIteration) {
    +			this.maxIteration = maxIteration;
    +		}
    +
    +		@Override
    +		public void preSuperstep() {
    +			doubleSumAggregator = getIterationAggregator("sumAllValue");
    +		}
    +
    +		@Override
    +		public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) {
    +			double updateValue = 0;
    +
    +			for (double element : inMessages) {
    +				if (getSuperstepNumber() == maxIteration) {
    +					updateValue = element;
    +					break;
    +				}
    +				updateValue += element;
    +			}
    +
    +			if (getSuperstepNumber() != maxIteration) {
    +				setNewVertexValue(updateValue);
    +				doubleSumAggregator.aggregate(updateValue);
    +			} else {
    +				setNewVertexValue(vertex.getValue() / updateValue);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Distributes the value of a vertex among all neighbor vertices and sum all the
    +	 * value in every superstep.
    +	 */
    +	@SuppressWarnings("serial")
    +	public static final class MessageUpdate<K> extends MessagingFunction<K, Double, Double, Double> {
    +		private int maxIteration;
    +
    +		public MessageUpdate(int maxIteration) {
    +			this.maxIteration = maxIteration;
    +		}
    +
    +		@Override
    +		public void sendMessages(Vertex<K, Double> vertex) {
    +			for (Edge<K, Double> edge : getEdges()) {
    +				if (getSuperstepNumber() % 2 == 1) {
    --- End diff --
    
    yes, the logic here is a little mixed. I will refactor it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-216190461
  
    @vasia Can you help with review work? :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-216224456
  
    @vasia i have noticed the `registerAggregator` can aggregate value in distributed environment and i have used it at the very beginning of this issue. But finally i failed because the aggregated value can only be get in the next `superstep`. I want to get the value in `VertexUpdateFunction` of current `superstep`. Can you give me some suggestions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r62837591
  
    --- Diff: flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/HITSData.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.examples.data;
    +
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.types.NullValue;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Provides the default data set used for the HITS test program.
    + * If no parameters are given to the program, the default edge data set is used.
    --- End diff --
    
    I guess you copied this comment from another similar class. The "If no parameters given..." refers to Gelly examples, which run with default data if no parameters are provided. In this case HITS is implemented as a library method, so this comment can be removed. This data is only used for testing as far as I can tell :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-218790864
  
    @vasia  @greghogan  Can you help with review the new commit? :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1956#issuecomment-221452144
  
    @vasia fixed!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r64055156
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -243,18 +255,22 @@ public void sendMessages(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex) {
     				iterationValueSum = Math.sqrt(((DoubleValue) getPreviousIterationAggregate("updatedValueSum")).getValue());
     			}
     			for (Edge<K, EV> edge : getEdges()) {
    -				K messageSource = getSuperstepNumber() % 2 == 1 ? edge.getSource() : edge.getTarget();
    -				K messageTarget = getSuperstepNumber() % 2 == 1 ? edge.getTarget() : edge.getSource();
    -				double messageValue = getSuperstepNumber() % 2 == 1 ? vertex.getValue().f0.getValue() : vertex.getValue().f1.getValue();
    -
    -				if (!messageTarget.equals(vertex.getId())) {
    -					if (getSuperstepNumber() != maxIteration) {
    -						sendMessageTo(messageTarget, messageValue / iterationValueSum);
    -
    -						// in order to make every vertex updated
    -						sendMessageTo(messageSource, 0.0);
    +				if (getSuperstepNumber() != maxIteration) {
    +					if (getSuperstepNumber() % 2 == 1) {
    +						if (edge.getValue().equals("Authority")) {
    +							sendMessageTo(edge.getTarget(), vertex.getValue().f0.getValue() / iterationValueSum);
    +						}
     					} else {
    -						sendMessageTo(messageSource, iterationValueSum);
    +						if (edge.getValue().equals("Hub")) {
    +							sendMessageTo(edge.getTarget(), vertex.getValue().f1.getValue() / iterationValueSum);
    +						}
    +					}
    +					
    +					// make all the vertices be updated
    +					sendMessageTo(edge.getSource(), 0.0);
    --- End diff --
    
    Sorry, this line should be moved. :)
    In the previous commit without edge label, i use the same edge for hub and authority updating and this line would keep every vertex updating. But in the newest implementation with edge label, we can drop this because the added edges can do the same work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r63354507
  
    --- Diff: flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/HITSData.java ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.examples.data;
    +
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.types.NullValue;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Provides the data set used for the HITS test program.
    + */
    +public class HITSData {
    +
    +	public static final String VALUE_AFTER_3_ITERATIONS = "1,0.707,0.007\n" +
    +															"2,0.003,0.707\n" +
    +															"3,0.003,0.500\n" +
    +															"4,0.500,0.500\n" +
    +															"5,0.500,0.007\n";
    +
    +
    +	private HITSData() {}
    +
    +	public static final DataSet<Vertex<Long, Double>> getVertexDataSet(ExecutionEnvironment env) {
    +
    +		List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>();
    --- End diff --
    
    OK, i will modify relevant codes in this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2044] [gelly] Implementation of Gelly H...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1956#discussion_r61767543
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library;
    +
    +import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
    +import org.apache.flink.api.java.DataSet;
    +
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.EdgeDirection;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.spargel.MessageIterator;
    +import org.apache.flink.graph.spargel.MessagingFunction;
    +import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
    +import org.apache.flink.graph.spargel.VertexUpdateFunction;
    +import org.apache.flink.types.DoubleValue;
    +
    +/**
    + * This is an implementation of HITS algorithm, using a scatter-gather iteration.
    + * The user can define the maximum number of iterations. HITS algorithm is determined by two parameters,
    + * hubs and authorities. A good hub represented a page that pointed to many other pages, and a good authority
    + * represented a page that was linked by many different hubs. The implementation assumes that the two value on
    + * every vertex are the same at the beginning.
    + * <p>
    + * If the number of vertices of the input graph is known, it should be provided as a parameter
    + * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
    + */
    +public class HITSAlgorithm<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    +
    +	public static enum HITSParameter {
    +		HUB,
    +		AUTHORITY
    +	}
    +
    +	private int maxIterations;
    +	private long numberOfVertices;
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is known,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, long, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +	}
    +
    +	/**
    +	 * Creates an instance of HITS algorithm.
    +	 * If the number of vertices of the input graph is unknown,
    +	 * use the {@link HITSAlgorithm#HITSAlgorithm(int, HITSParameter)} constructor instead.
    +	 *
    +	 * @param maxIterations the maximum number of iterations
    +	 * @param hitsParameter the type of final web pages users want to get by this algorithm
    +	 */
    +	public HITSAlgorithm(int maxIterations, long numberOfVertices, HITSParameter hitsParameter) {
    +		if (hitsParameter == HITSParameter.AUTHORITY) {
    +			this.maxIterations = maxIterations * 2;
    +		} else {
    +			this.maxIterations = maxIterations * 2 + 1;
    +		}
    +		this.numberOfVertices = numberOfVertices;
    +	}
    +
    +	@Override
    +	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> netGraph) throws Exception {
    +		if (this.numberOfVertices == 0) {
    +			this.numberOfVertices = netGraph.numberOfVertices();
    +		}
    +
    +		ScatterGatherConfiguration parameter = new ScatterGatherConfiguration();
    +		parameter.setDirection(EdgeDirection.ALL);
    +		parameter.registerAggregator("sumAllValue", new DoubleSumAggregator());
    +
    +		return netGraph.runScatterGatherIteration(new VertexUpdate<K>(maxIterations),
    +				new MessageUpdate<K>(maxIterations), maxIterations, parameter).getVertices();
    +	}
    +
    +	/**
    +	 * Function that updates the value of a vertex by summing up the partial
    +	 * values from all messages and normalize the value.
    +	 */
    +	@SuppressWarnings("serial")
    +	public static final class VertexUpdate<K> extends VertexUpdateFunction<K, Double, Double> {
    +		private int maxIteration;
    +		private DoubleSumAggregator doubleSumAggregator;
    +
    +		public VertexUpdate(int maxIteration) {
    +			this.maxIteration = maxIteration;
    +		}
    +
    +		@Override
    +		public void preSuperstep() {
    +			doubleSumAggregator = getIterationAggregator("sumAllValue");
    +		}
    +
    +		@Override
    +		public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) {
    +			double updateValue = 0;
    +
    +			for (double element : inMessages) {
    +				if (getSuperstepNumber() == maxIteration) {
    +					updateValue = element;
    +					break;
    +				}
    +				updateValue += element;
    +			}
    +
    +			if (getSuperstepNumber() != maxIteration) {
    +				setNewVertexValue(updateValue);
    +				doubleSumAggregator.aggregate(updateValue);
    +			} else {
    +				setNewVertexValue(vertex.getValue() / updateValue);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Distributes the value of a vertex among all neighbor vertices and sum all the
    +	 * value in every superstep.
    +	 */
    +	@SuppressWarnings("serial")
    +	public static final class MessageUpdate<K> extends MessagingFunction<K, Double, Double, Double> {
    +		private int maxIteration;
    +
    +		public MessageUpdate(int maxIteration) {
    +			this.maxIteration = maxIteration;
    +		}
    +
    +		@Override
    +		public void sendMessages(Vertex<K, Double> vertex) {
    +			for (Edge<K, Double> edge : getEdges()) {
    +				if (getSuperstepNumber() % 2 == 1) {
    +					if (edge.getTarget() != vertex.getId()) {
    --- End diff --
    
    This is testing for object equality. Same below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---