You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/26 19:16:16 UTC
[07/15] flink git commit: [FLINK-6709] [gelly] Activate strict
checkstyle for flink-gellies
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
new file mode 100644
index 0000000..e8422ac
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.linkanalysis;
+
+import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
+import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.asm.result.UnaryResult;
+import org.apache.flink.graph.library.linkanalysis.Functions.SumScore;
+import org.apache.flink.graph.library.linkanalysis.HITS.Result;
+import org.apache.flink.graph.utils.MurmurHash;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Hyperlink-Induced Topic Search computes two interdependent scores for every
+ * vertex in a directed graph. A good "hub" links to good "authorities" and
+ * good "authorities" are linked from good "hubs".
+ *
+ * <p>This algorithm can be configured to terminate either by a limit on the number
+ * of iterations, a convergence threshold, or both.
+ *
+ * <p>See http://www.cs.cornell.edu/home/kleinber/auth.pdf
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class HITS<K, VV, EV>
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
+
+ private static final String CHANGE_IN_SCORES = "change in scores";
+
+ private static final String HUBBINESS_SUM_SQUARED = "hubbiness sum squared";
+
+ private static final String AUTHORITY_SUM_SQUARED = "authority sum squared";
+
+ // Required configuration
+ private int maxIterations;
+
+ private double convergenceThreshold;
+
+ // Optional configuration
+ private int parallelism = PARALLELISM_DEFAULT;
+
+ /**
+ * Hyperlink-Induced Topic Search with a fixed number of iterations.
+ *
+ * @param iterations fixed number of iterations
+ */
+ public HITS(int iterations) {
+ this(iterations, Double.MAX_VALUE);
+ }
+
+ /**
+ * Hyperlink-Induced Topic Search with a convergence threshold. The algorithm
+ * terminates when the total change in hub and authority scores over all
+ * vertices falls to or below the given threshold value.
+ *
+ * @param convergenceThreshold convergence threshold for sum of scores
+ */
+ public HITS(double convergenceThreshold) {
+ this(Integer.MAX_VALUE, convergenceThreshold);
+ }
+
+ /**
+ * Hyperlink-Induced Topic Search with a convergence threshold and a maximum
+ * iteration count. The algorithm terminates after either the given number
+ * of iterations or when the total change in hub and authority scores over all
+ * vertices falls to or below the given threshold value.
+ *
+ * @param maxIterations maximum number of iterations
+ * @param convergenceThreshold convergence threshold for sum of scores
+ */
+ public HITS(int maxIterations, double convergenceThreshold) {
+ Preconditions.checkArgument(maxIterations > 0, "Number of iterations must be greater than zero");
+ Preconditions.checkArgument(convergenceThreshold > 0.0, "Convergence threshold must be greater than zero");
+
+ this.maxIterations = maxIterations;
+ this.convergenceThreshold = convergenceThreshold;
+ }
+
+ /**
+ * Override the operator parallelism.
+ *
+ * @param parallelism operator parallelism
+ * @return this
+ */
+ public HITS<K, VV, EV> setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+
+ return this;
+ }
+
+ @Override
+ protected String getAlgorithmName() {
+ return HITS.class.getName();
+ }
+
+ @Override
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+ Preconditions.checkNotNull(other);
+
+ if (!HITS.class.isAssignableFrom(other.getClass())) {
+ return false;
+ }
+
+ HITS rhs = (HITS) other;
+
+ // merge configurations
+
+ maxIterations = Math.max(maxIterations, rhs.maxIterations);
+ convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
+
+ return true;
+ }
+
+ @Override
+ public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
+ throws Exception {
+ DataSet<Tuple2<K, K>> edges = input
+ .getEdges()
+ .map(new ExtractEdgeIDs<K, EV>())
+ .setParallelism(parallelism)
+ .name("Extract edge IDs");
+
+ // ID, hub, authority
+ DataSet<Tuple3<K, DoubleValue, DoubleValue>> initialScores = edges
+ .map(new InitializeScores<K>())
+ .setParallelism(parallelism)
+ .name("Initial scores")
+ .groupBy(0)
+ .reduce(new SumScores<K>())
+ .setCombineHint(CombineHint.HASH)
+ .setParallelism(parallelism)
+ .name("Sum");
+
+ IterativeDataSet<Tuple3<K, DoubleValue, DoubleValue>> iterative = initialScores
+ .iterate(maxIterations);
+
+ // ID, hubbiness
+ DataSet<Tuple2<K, DoubleValue>> hubbiness = iterative
+ .coGroup(edges)
+ .where(0)
+ .equalTo(1)
+ .with(new Hubbiness<K>())
+ .setParallelism(parallelism)
+ .name("Hub")
+ .groupBy(0)
+ .reduce(new SumScore<K>())
+ .setCombineHint(CombineHint.HASH)
+ .setParallelism(parallelism)
+ .name("Sum");
+
+ // sum-of-hubbiness-squared
+ DataSet<DoubleValue> hubbinessSumSquared = hubbiness
+ .map(new Square<K>())
+ .setParallelism(parallelism)
+ .name("Square")
+ .reduce(new Sum())
+ .setCombineHint(CombineHint.HASH)
+ .setParallelism(parallelism)
+ .name("Sum");
+
+ // ID, new authority
+ DataSet<Tuple2<K, DoubleValue>> authority = hubbiness
+ .coGroup(edges)
+ .where(0)
+ .equalTo(0)
+ .with(new Authority<K>())
+ .setParallelism(parallelism)
+ .name("Authority")
+ .groupBy(0)
+ .reduce(new SumScore<K>())
+ .setCombineHint(CombineHint.HASH)
+ .setParallelism(parallelism)
+ .name("Sum");
+
+ // sum-of-authority-squared
+ DataSet<DoubleValue> authoritySumSquared = authority
+ .map(new Square<K>())
+ .setParallelism(parallelism)
+ .name("Square")
+ .reduce(new Sum())
+ .setCombineHint(CombineHint.HASH)
+ .setParallelism(parallelism)
+ .name("Sum");
+
+ // ID, normalized hubbiness, normalized authority
+ DataSet<Tuple3<K, DoubleValue, DoubleValue>> scores = hubbiness
+ .fullOuterJoin(authority, JoinHint.REPARTITION_SORT_MERGE)
+ .where(0)
+ .equalTo(0)
+ .with(new JoinAndNormalizeHubAndAuthority<K>())
+ .withBroadcastSet(hubbinessSumSquared, HUBBINESS_SUM_SQUARED)
+ .withBroadcastSet(authoritySumSquared, AUTHORITY_SUM_SQUARED)
+ .setParallelism(parallelism)
+ .name("Join scores");
+
+ DataSet<Tuple3<K, DoubleValue, DoubleValue>> passThrough;
+
+ if (convergenceThreshold < Double.MAX_VALUE) {
+ passThrough = iterative
+ .fullOuterJoin(scores, JoinHint.REPARTITION_SORT_MERGE)
+ .where(0)
+ .equalTo(0)
+ .with(new ChangeInScores<K>())
+ .setParallelism(parallelism)
+ .name("Change in scores");
+
+ iterative.registerAggregationConvergenceCriterion(CHANGE_IN_SCORES, new DoubleSumAggregator(), new ScoreConvergence(convergenceThreshold));
+ } else {
+ passThrough = scores;
+ }
+
+ return iterative
+ .closeWith(passThrough)
+ .map(new TranslateResult<K>())
+ .setParallelism(parallelism)
+ .name("Map result");
+ }
+
+ /**
+ * Map edges and remove the edge value.
+ *
+ * @param <T> ID type
+ * @param <ET> edge value type
+ *
+ * @see Graph.ExtractEdgeIDsMapper
+ */
+ @ForwardedFields("0; 1")
+ private static class ExtractEdgeIDs<T, ET>
+ implements MapFunction<Edge<T, ET>, Tuple2<T, T>> {
+ private Tuple2<T, T> output = new Tuple2<>();
+
+ @Override
+ public Tuple2<T, T> map(Edge<T, ET> value)
+ throws Exception {
+ output.f0 = value.f0;
+ output.f1 = value.f1;
+ return output;
+ }
+ }
+
+ /**
+ * Initialize vertices' authority scores by assigning each vertex with an
+ * initial hub score of 1.0. The hub scores are initialized to zero since
+ * these will be computed based on the initial authority scores.
+ *
+ * <p>The initial scores are non-normalized.
+ *
+ * @param <T> ID type
+ */
+ @ForwardedFields("1->0")
+ private static class InitializeScores<T>
+ implements MapFunction<Tuple2<T, T>, Tuple3<T, DoubleValue, DoubleValue>> {
+ private Tuple3<T, DoubleValue, DoubleValue> output = new Tuple3<>(null, new DoubleValue(0.0), new DoubleValue(1.0));
+
+ @Override
+ public Tuple3<T, DoubleValue, DoubleValue> map(Tuple2<T, T> value) throws Exception {
+ output.f0 = value.f1;
+ return output;
+ }
+ }
+
+ /**
+ * Sum vertices' hub and authority scores.
+ *
+ * @param <T> ID type
+ */
+ @ForwardedFields("0")
+ private static class SumScores<T>
+ implements ReduceFunction<Tuple3<T, DoubleValue, DoubleValue>> {
+ @Override
+ public Tuple3<T, DoubleValue, DoubleValue> reduce(Tuple3<T, DoubleValue, DoubleValue> left, Tuple3<T, DoubleValue, DoubleValue> right)
+ throws Exception {
+ left.f1.setValue(left.f1.getValue() + right.f1.getValue());
+ left.f2.setValue(left.f2.getValue() + right.f2.getValue());
+ return left;
+ }
+ }
+
+ /**
+ * The hub score is the sum of authority scores of vertices on out-edges.
+ *
+ * @param <T> ID type
+ */
+ @ForwardedFieldsFirst("2->1")
+ @ForwardedFieldsSecond("0")
+ private static class Hubbiness<T>
+ implements CoGroupFunction<Tuple3<T, DoubleValue, DoubleValue>, Tuple2<T, T>, Tuple2<T, DoubleValue>> {
+ private Tuple2<T, DoubleValue> output = new Tuple2<>();
+
+ @Override
+ public void coGroup(Iterable<Tuple3<T, DoubleValue, DoubleValue>> vertex, Iterable<Tuple2<T, T>> edges, Collector<Tuple2<T, DoubleValue>> out)
+ throws Exception {
+ output.f1 = vertex.iterator().next().f2;
+
+ for (Tuple2<T, T> edge : edges) {
+ output.f0 = edge.f0;
+ out.collect(output);
+ }
+ }
+ }
+
+ /**
+ * The authority score is the sum of hub scores of vertices on in-edges.
+ *
+ * @param <T> ID type
+ */
+ @ForwardedFieldsFirst("1")
+ @ForwardedFieldsSecond("1->0")
+ private static class Authority<T>
+ implements CoGroupFunction<Tuple2<T, DoubleValue>, Tuple2<T, T>, Tuple2<T, DoubleValue>> {
+ private Tuple2<T, DoubleValue> output = new Tuple2<>();
+
+ @Override
+ public void coGroup(Iterable<Tuple2<T, DoubleValue>> vertex, Iterable<Tuple2<T, T>> edges, Collector<Tuple2<T, DoubleValue>> out)
+ throws Exception {
+ output.f1 = vertex.iterator().next().f1;
+
+ for (Tuple2<T, T> edge : edges) {
+ output.f0 = edge.f1;
+ out.collect(output);
+ }
+ }
+ }
+
+ /**
+ * Compute the square of each score.
+ *
+ * @param <T> ID type
+ */
+ private static class Square<T>
+ implements MapFunction<Tuple2<T, DoubleValue>, DoubleValue> {
+ private DoubleValue output = new DoubleValue();
+
+ @Override
+ public DoubleValue map(Tuple2<T, DoubleValue> value)
+ throws Exception {
+ double val = value.f1.getValue();
+ output.setValue(val * val);
+
+ return output;
+ }
+ }
+
+ /**
+ * Sum over values. This specialized function is used in place of generic aggregation.
+ */
+ private static class Sum
+ implements ReduceFunction<DoubleValue> {
+ @Override
+ public DoubleValue reduce(DoubleValue first, DoubleValue second)
+ throws Exception {
+ first.setValue(first.getValue() + second.getValue());
+ return first;
+ }
+ }
+
+ /**
+ * Join and normalize the hub and authority scores.
+ *
+ * @param <T> ID type
+ */
+ @ForwardedFieldsFirst("0")
+ @ForwardedFieldsSecond("0")
+ private static class JoinAndNormalizeHubAndAuthority<T>
+ extends RichJoinFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>, Tuple3<T, DoubleValue, DoubleValue>> {
+ private Tuple3<T, DoubleValue, DoubleValue> output = new Tuple3<>(null, new DoubleValue(), new DoubleValue());
+
+ private double hubbinessRootSumSquared;
+
+ private double authorityRootSumSquared;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ Collection<DoubleValue> var;
+ var = getRuntimeContext().getBroadcastVariable(HUBBINESS_SUM_SQUARED);
+ hubbinessRootSumSquared = Math.sqrt(var.iterator().next().getValue());
+
+ var = getRuntimeContext().getBroadcastVariable(AUTHORITY_SUM_SQUARED);
+ authorityRootSumSquared = Math.sqrt(var.iterator().next().getValue());
+ }
+
+ @Override
+ public Tuple3<T, DoubleValue, DoubleValue> join(Tuple2<T, DoubleValue> hubbiness, Tuple2<T, DoubleValue> authority)
+ throws Exception {
+ output.f0 = (authority == null) ? hubbiness.f0 : authority.f0;
+ output.f1.setValue(hubbiness == null ? 0.0 : hubbiness.f1.getValue() / hubbinessRootSumSquared);
+ output.f2.setValue(authority == null ? 0.0 : authority.f1.getValue() / authorityRootSumSquared);
+ return output;
+ }
+ }
+
+ /**
+ * Computes the total sum of the change in hub and authority scores over
+ * all vertices between iterations. A negative score is emitted after the
+ * first iteration to prevent premature convergence.
+ *
+ * @param <T> ID type
+ */
+ @ForwardedFieldsFirst("0")
+ @ForwardedFieldsSecond("*")
+ private static class ChangeInScores<T>
+ extends RichJoinFunction<Tuple3<T, DoubleValue, DoubleValue>, Tuple3<T, DoubleValue, DoubleValue>, Tuple3<T, DoubleValue, DoubleValue>> {
+ private boolean isInitialSuperstep;
+
+ private double changeInScores;
+
+ @Override
+ public void open(Configuration parameters)
+ throws Exception {
+ super.open(parameters);
+
+ isInitialSuperstep = (getIterationRuntimeContext().getSuperstepNumber() == 1);
+ changeInScores = (isInitialSuperstep) ? -1.0 : 0.0;
+ }
+
+ @Override
+ public void close()
+ throws Exception {
+ super.close();
+
+ DoubleSumAggregator agg = getIterationRuntimeContext().getIterationAggregator(CHANGE_IN_SCORES);
+ agg.aggregate(changeInScores);
+ }
+
+ @Override
+ public Tuple3<T, DoubleValue, DoubleValue> join(Tuple3<T, DoubleValue, DoubleValue> first, Tuple3<T, DoubleValue, DoubleValue> second)
+ throws Exception {
+ if (!isInitialSuperstep) {
+ changeInScores += Math.abs(second.f1.getValue() - first.f1.getValue());
+ changeInScores += Math.abs(second.f2.getValue() - first.f2.getValue());
+ }
+
+ return second;
+ }
+ }
+
+ /**
+ * Monitors the total change in hub and authority scores over all vertices.
+ * The algorithm terminates when the change in scores compared against the
+ * prior iteration falls to or below the given convergence threshold.
+ *
+ * <p>An optimization of this implementation of HITS is to leave the initial
+ * scores non-normalized; therefore, the change in scores after the first
+ * superstep cannot be measured and a negative value is emitted to signal
+ * that the iteration should continue.
+ */
+ private static class ScoreConvergence
+ implements ConvergenceCriterion<DoubleValue> {
+ private double convergenceThreshold;
+
+ public ScoreConvergence(double convergenceThreshold) {
+ this.convergenceThreshold = convergenceThreshold;
+ }
+
+ @Override
+ public boolean isConverged(int iteration, DoubleValue value) {
+ double val = value.getValue();
+ return (0 <= val && val <= convergenceThreshold);
+ }
+ }
+
+ /**
+ * Map the Tuple result to the return type.
+ *
+ * @param <T> ID type
+ */
+ @ForwardedFields("0; 1; 2")
+ private static class TranslateResult<T>
+ implements MapFunction<Tuple3<T, DoubleValue, DoubleValue>, Result<T>> {
+ private Result<T> output = new Result<>();
+
+ @Override
+ public Result<T> map(Tuple3<T, DoubleValue, DoubleValue> value) throws Exception {
+ output.f0 = value.f0;
+ output.f1 = value.f1;
+ output.f2 = value.f2;
+ return output;
+ }
+ }
+
+ /**
+ * Wraps the {@link Tuple3} to encapsulate results from the HITS algorithm.
+ *
+ * @param <T> ID type
+ */
+ public static class Result<T>
+ extends Tuple3<T, DoubleValue, DoubleValue>
+ implements PrintableResult, UnaryResult<T> {
+ public static final int HASH_SEED = 0xc7e39a63;
+
+ private MurmurHash hasher = new MurmurHash(HASH_SEED);
+
+ @Override
+ public T getVertexId0() {
+ return f0;
+ }
+
+ @Override
+ public void setVertexId0(T value) {
+ f0 = value;
+ }
+
+ /**
+ * Get the hub score. Good hubs link to good authorities.
+ *
+ * @return the hub score
+ */
+ public DoubleValue getHubScore() {
+ return f1;
+ }
+
+ /**
+ * Get the authority score. Good authorities link to good hubs.
+ *
+ * @return the authority score
+ */
+ public DoubleValue getAuthorityScore() {
+ return f2;
+ }
+
+ public String toPrintableString() {
+ return "Vertex ID: " + getVertexId0()
+ + ", hub score: " + getHubScore()
+ + ", authority score: " + getAuthorityScore();
+ }
+
+ @Override
+ public int hashCode() {
+ return hasher.reset()
+ .hash(f0.hashCode())
+ .hash(f1.getValue())
+ .hash(f2.getValue())
+ .hash();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
new file mode 100644
index 0000000..ecd5f39
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.linkanalysis;
+
+import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
+import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.directed.EdgeSourceDegrees;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.asm.result.UnaryResult;
+import org.apache.flink.graph.library.linkanalysis.Functions.SumScore;
+import org.apache.flink.graph.library.linkanalysis.PageRank.Result;
+import org.apache.flink.graph.utils.GraphUtils;
+import org.apache.flink.graph.utils.MurmurHash;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * PageRank computes a per-vertex score which is the sum of PageRank scores
+ * transmitted over in-edges. Each vertex's score is divided evenly among
+ * out-edges. High-scoring vertices are linked to by other high-scoring
+ * vertices; this is similar to the 'authority' score in {@link HITS}.
+ *
+ * <p>See http://ilpubs.stanford.edu:8090/422/1/1999-66.pdf
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class PageRank<K, VV, EV>
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
+
+ private static final String VERTEX_COUNT = "vertex count";
+
+ private static final String SUM_OF_SCORES = "sum of scores";
+
+ private static final String CHANGE_IN_SCORES = "change in scores";
+
+ // Required configuration
+ private final double dampingFactor;
+
+ private int maxIterations;
+
+ private double convergenceThreshold;
+
+ // Optional configuration
+ private int parallelism = PARALLELISM_DEFAULT;
+
+ /**
+ * PageRank with a fixed number of iterations.
+ *
+ * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex
+ * @param iterations fixed number of iterations
+ */
+ public PageRank(double dampingFactor, int iterations) {
+ this(dampingFactor, iterations, Double.MAX_VALUE);
+ }
+
+ /**
+ * PageRank with a convergence threshold. The algorithm terminates when the
+ * change in score over all vertices falls to or below the given threshold value.
+ *
+ * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex
+ * @param convergenceThreshold convergence threshold for sum of scores
+ */
+ public PageRank(double dampingFactor, double convergenceThreshold) {
+ this(dampingFactor, Integer.MAX_VALUE, convergenceThreshold);
+ }
+
+ /**
+ * PageRank with a convergence threshold and a maximum iteration count. The
+ * algorithm terminates after either the given number of iterations or when
+ * the change in score over all vertices falls to or below the given
+ * threshold value.
+ *
+ * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex
+ * @param maxIterations maximum number of iterations
+ * @param convergenceThreshold convergence threshold for sum of scores
+ */
+ public PageRank(double dampingFactor, int maxIterations, double convergenceThreshold) {
+ Preconditions.checkArgument(0 < dampingFactor && dampingFactor < 1,
+ "Damping factor must be between zero and one");
+ Preconditions.checkArgument(maxIterations > 0, "Number of iterations must be greater than zero");
+ Preconditions.checkArgument(convergenceThreshold > 0.0, "Convergence threshold must be greater than zero");
+
+ this.dampingFactor = dampingFactor;
+ this.maxIterations = maxIterations;
+ this.convergenceThreshold = convergenceThreshold;
+ }
+
+ /**
+ * Override the operator parallelism.
+ *
+ * @param parallelism operator parallelism
+ * @return this
+ */
+ public PageRank<K, VV, EV> setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+
+ return this;
+ }
+
+ @Override
+ protected String getAlgorithmName() {
+ return PageRank.class.getName();
+ }
+
+ @Override
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+ Preconditions.checkNotNull(other);
+
+ if (!PageRank.class.isAssignableFrom(other.getClass())) {
+ return false;
+ }
+
+ PageRank rhs = (PageRank) other;
+
+ // merge configurations
+
+ maxIterations = Math.max(maxIterations, rhs.maxIterations);
+ convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
+
+ return true;
+ }
+
+ @Override
+ public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
+ throws Exception {
+ // vertex degree
+ DataSet<Vertex<K, Degrees>> vertexDegree = input
+ .run(new VertexDegrees<K, VV, EV>()
+ .setParallelism(parallelism));
+
+ // vertex count
+ DataSet<LongValue> vertexCount = GraphUtils.count(vertexDegree);
+
+ // s, t, d(s)
+ DataSet<Edge<K, LongValue>> edgeSourceDegree = input
+ .run(new EdgeSourceDegrees<K, VV, EV>()
+ .setParallelism(parallelism))
+ .map(new ExtractSourceDegree<K, EV>())
+ .setParallelism(parallelism)
+ .name("Extract source degree");
+
+ // vertices with zero in-edges
+ DataSet<Tuple2<K, DoubleValue>> sourceVertices = vertexDegree
+ .flatMap(new InitializeSourceVertices<K>())
+ .withBroadcastSet(vertexCount, VERTEX_COUNT)
+ .setParallelism(parallelism)
+ .name("Initialize source vertex scores");
+
+ // s, initial pagerank(s)
+ DataSet<Tuple2<K, DoubleValue>> initialScores = vertexDegree
+ .map(new InitializeVertexScores<K>())
+ .withBroadcastSet(vertexCount, VERTEX_COUNT)
+ .setParallelism(parallelism)
+ .name("Initialize scores");
+
+ IterativeDataSet<Tuple2<K, DoubleValue>> iterative = initialScores
+ .iterate(maxIterations);
+
+ // s, projected pagerank(s)
+ DataSet<Tuple2<K, DoubleValue>> vertexScores = iterative
+ .coGroup(edgeSourceDegree)
+ .where(0)
+ .equalTo(0)
+ .with(new SendScore<K>())
+ .setParallelism(parallelism)
+ .name("Send score")
+ .groupBy(0)
+ .reduce(new SumScore<K>())
+ .setCombineHint(CombineHint.HASH)
+ .setParallelism(parallelism)
+ .name("Sum");
+
+ // ignored ID, total pagerank
+ DataSet<Tuple2<K, DoubleValue>> sumOfScores = vertexScores
+ .reduce(new SumVertexScores<K>())
+ .setParallelism(parallelism)
+ .name("Sum");
+
+ // s, adjusted pagerank(s)
+ DataSet<Tuple2<K, DoubleValue>> adjustedScores = vertexScores
+ .union(sourceVertices)
+ .setParallelism(parallelism)
+ .name("Union with source vertices")
+ .map(new AdjustScores<K>(dampingFactor))
+ .withBroadcastSet(sumOfScores, SUM_OF_SCORES)
+ .withBroadcastSet(vertexCount, VERTEX_COUNT)
+ .setParallelism(parallelism)
+ .name("Adjust scores");
+
+ DataSet<Tuple2<K, DoubleValue>> passThrough;
+
+ if (convergenceThreshold < Double.MAX_VALUE) {
+ passThrough = iterative
+ .join(adjustedScores)
+ .where(0)
+ .equalTo(0)
+ .with(new ChangeInScores<K>())
+ .setParallelism(parallelism)
+ .name("Change in scores");
+
+ iterative.registerAggregationConvergenceCriterion(CHANGE_IN_SCORES, new DoubleSumAggregator(), new ScoreConvergence(convergenceThreshold));
+ } else {
+ passThrough = adjustedScores;
+ }
+
+ return iterative
+ .closeWith(passThrough)
+ .map(new TranslateResult<K>())
+ .setParallelism(parallelism)
+ .name("Map result");
+ }
+
+ /**
+ * Remove the unused original edge value and extract the out-degree.
+ *
+ * @param <T> ID type
+ * @param <ET> edge value type
+ */
+ @ForwardedFields("0; 1")
+ private static class ExtractSourceDegree<T, ET>
+ implements MapFunction<Edge<T, Tuple2<ET, Degrees>>, Edge<T, LongValue>> {
+ Edge<T, LongValue> output = new Edge<>();
+
+ @Override
+ public Edge<T, LongValue> map(Edge<T, Tuple2<ET, Degrees>> edge)
+ throws Exception {
+ output.f0 = edge.f0;
+ output.f1 = edge.f1;
+ output.f2 = edge.f2.f1.getOutDegree();
+ return output;
+ }
+ }
+
+ /**
+ * Source vertices have no in-edges so have a projected score of 0.0.
+ *
+ * @param <T> ID type
+ */
+ @ForwardedFields("0")
+ private static class InitializeSourceVertices<T>
+ implements FlatMapFunction<Vertex<T, Degrees>, Tuple2<T, DoubleValue>> {
+ private Tuple2<T, DoubleValue> output = new Tuple2<>(null, new DoubleValue(0.0));
+
+ @Override
+ public void flatMap(Vertex<T, Degrees> vertex, Collector<Tuple2<T, DoubleValue>> out)
+ throws Exception {
+ if (vertex.f1.getInDegree().getValue() == 0) {
+ output.f0 = vertex.f0;
+ out.collect(output);
+ }
+ }
+ }
+
+ /**
+ * PageRank scores sum to 1.0 so initialize each vertex with the inverse of
+ * the number of vertices.
+ *
+ * @param <T> ID type
+ */
+ @ForwardedFields("0")
+ private static class InitializeVertexScores<T>
+ extends RichMapFunction<Vertex<T, Degrees>, Tuple2<T, DoubleValue>> {
+ private Tuple2<T, DoubleValue> output = new Tuple2<>();
+
+ @Override
+ public void open(Configuration parameters)
+ throws Exception {
+ super.open(parameters);
+
+ Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT);
+ output.f1 = new DoubleValue(1.0 / vertexCount.iterator().next().getValue());
+ }
+
+ @Override
+ public Tuple2<T, DoubleValue> map(Vertex<T, Degrees> vertex)
+ throws Exception {
+ output.f0 = vertex.f0;
+ return output;
+ }
+ }
+
+ /**
+ * The PageRank score for each vertex is divided evenly and projected to
+ * neighbors on out-edges.
+ *
+ * @param <T> ID type
+ */
+ @ForwardedFieldsSecond("1->0")
+ private static class SendScore<T>
+ implements CoGroupFunction<Tuple2<T, DoubleValue>, Edge<T, LongValue>, Tuple2<T, DoubleValue>> {
+ private Tuple2<T, DoubleValue> output = new Tuple2<>(null, new DoubleValue());
+
+ @Override
+ public void coGroup(Iterable<Tuple2<T, DoubleValue>> vertex, Iterable<Edge<T, LongValue>> edges, Collector<Tuple2<T, DoubleValue>> out)
+ throws Exception {
+ Iterator<Edge<T, LongValue>> edgeIterator = edges.iterator();
+
+ if (edgeIterator.hasNext()) {
+ Edge<T, LongValue> edge = edgeIterator.next();
+
+ output.f0 = edge.f1;
+ output.f1.setValue(vertex.iterator().next().f1.getValue() / edge.f2.getValue());
+ out.collect(output);
+
+ while (edgeIterator.hasNext()) {
+ edge = edgeIterator.next();
+ output.f0 = edge.f1;
+ out.collect(output);
+ }
+ }
+ }
+ }
+
+ /**
+ * Sum the PageRank score over all vertices. The vertex ID must be ignored
+ * but is retained rather than adding another operator.
+ *
+ * @param <T> ID type
+ */
+ @ForwardedFields("0")
+ private static class SumVertexScores<T>
+ implements ReduceFunction<Tuple2<T, DoubleValue>> {
+ @Override
+ public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> first, Tuple2<T, DoubleValue> second)
+ throws Exception {
+ first.f1.setValue(first.f1.getValue() + second.f1.getValue());
+ return first;
+ }
+ }
+
+ /**
+ * Each iteration the per-vertex scores are adjusted with the damping
+ * factor. Each score is multiplied by the damping factor then added to the
+ * probability of a "random hop", which is one minus the damping factor.
+ *
+ * <p>This operation also accounts for 'sink' vertices, which have no
+ * out-edges to project score to. The sink scores are computed by taking
+ * one minus the sum of vertex scores, which also includes precision error.
+ * This 'missing' score is evenly distributed across vertices as with the
+ * random hop.
+ *
+ * @param <T> ID type
+ */
+ @ForwardedFields("0")
+ private static class AdjustScores<T>
+ extends RichMapFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>> {
+ private double dampingFactor;
+
+ private long vertexCount;
+
+ private double uniformlyDistributedScore;
+
+ public AdjustScores(double dampingFactor) {
+ this.dampingFactor = dampingFactor;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ Collection<Tuple2<T, DoubleValue>> sumOfScores = getRuntimeContext().getBroadcastVariable(SUM_OF_SCORES);
+ // floating point precision error is also included in sumOfSinks
+ double sumOfSinks = 1 - sumOfScores.iterator().next().f1.getValue();
+
+ Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT);
+ this.vertexCount = vertexCount.iterator().next().getValue();
+
+ this.uniformlyDistributedScore = ((1 - dampingFactor) + dampingFactor * sumOfSinks) / this.vertexCount;
+ }
+
+ @Override
+ public Tuple2<T, DoubleValue> map(Tuple2<T, DoubleValue> value) throws Exception {
+ value.f1.setValue(uniformlyDistributedScore + (dampingFactor * value.f1.getValue()));
+ return value;
+ }
+ }
+
+ /**
+ * Computes the sum of the absolute change in vertex PageRank scores
+ * between iterations.
+ *
+ * @param <T> ID type
+ */
+ @ForwardedFieldsFirst("0")
+ @ForwardedFieldsSecond("*")
+ private static class ChangeInScores<T>
+ extends RichJoinFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>> {
+ private double changeInScores;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ changeInScores = 0.0;
+ }
+
+ @Override
+ public void close()
+ throws Exception {
+ super.close();
+
+ DoubleSumAggregator agg = getIterationRuntimeContext().getIterationAggregator(CHANGE_IN_SCORES);
+ agg.aggregate(changeInScores);
+ }
+
+ @Override
+ public Tuple2<T, DoubleValue> join(Tuple2<T, DoubleValue> first, Tuple2<T, DoubleValue> second)
+ throws Exception {
+ changeInScores += Math.abs(second.f1.getValue() - first.f1.getValue());
+ return second;
+ }
+ }
+
+ /**
+ * Monitors the sum of the absolute change in vertex scores. The algorithm
+ * terminates when the change in scores compared against the prior iteration
+ * falls to or below the given convergence threshold.
+ */
+ private static class ScoreConvergence
+ implements ConvergenceCriterion<DoubleValue> {
+ private double convergenceThreshold;
+
+ public ScoreConvergence(double convergenceThreshold) {
+ this.convergenceThreshold = convergenceThreshold;
+ }
+
+ @Override
+ public boolean isConverged(int iteration, DoubleValue value) {
+ double val = value.getValue();
+ return (val <= convergenceThreshold);
+ }
+ }
+
+ /**
+ * Map the Tuple result to the return type.
+ *
+ * @param <T> ID type
+ */
+ @ForwardedFields("0; 1")
+ private static class TranslateResult<T>
+ implements MapFunction<Tuple2<T, DoubleValue>, Result<T>> {
+ private Result<T> output = new Result<>();
+
+ @Override
+ public Result<T> map(Tuple2<T, DoubleValue> value) throws Exception {
+ output.f0 = value.f0;
+ output.f1 = value.f1;
+ return output;
+ }
+ }
+
+ /**
+ * Wraps the {@link Tuple2} to encapsulate results from the PageRank algorithm.
+ *
+ * @param <T> ID type
+ */
+ public static class Result<T>
+ extends Tuple2<T, DoubleValue>
+ implements PrintableResult, UnaryResult<T> {
+ public static final int HASH_SEED = 0x4010af29;
+
+ private MurmurHash hasher = new MurmurHash(HASH_SEED);
+
+ @Override
+ public T getVertexId0() {
+ return f0;
+ }
+
+ @Override
+ public void setVertexId0(T value) {
+ f0 = value;
+ }
+
+ /**
+ * Get the PageRank score.
+ *
+ * @return the PageRank score
+ */
+ public DoubleValue getPageRankScore() {
+ return f1;
+ }
+
+ @Override
+ public String toPrintableString() {
+ return "Vertex ID: " + getVertexId0()
+ + ", PageRank score: " + getPageRankScore();
+ }
+
+ @Override
+ public int hashCode() {
+ return hasher.reset()
+ .hash(f0.hashCode())
+ .hash(f1.getValue())
+ .hash();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
index d2eeb41..ff4ad48 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
@@ -18,9 +18,9 @@
package org.apache.flink.graph.library.metric;
-import org.apache.flink.graph.AbstractGraphAnalytic;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalyticBase;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
@@ -35,7 +35,7 @@ import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
* @param <EV> edge value type
*/
public class ChecksumHashCode<K, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Checksum> {
+extends GraphAnalyticBase<K, VV, EV, Checksum> {
private org.apache.flink.graph.asm.dataset.ChecksumHashCode<Vertex<K, VV>> vertexChecksum;
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
index 82cc607..30d3563 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -18,8 +18,6 @@
package org.apache.flink.graph.library.metric.directed;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.accumulators.LongMaximum;
import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -31,10 +29,10 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.AbstractGraphAnalytic;
import org.apache.flink.graph.AnalyticHelper;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalyticBase;
import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.asm.result.PrintableResult;
@@ -42,13 +40,16 @@ import org.apache.flink.graph.library.metric.directed.EdgeMetrics.Result;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
import java.io.IOException;
import java.text.NumberFormat;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
- * Compute the following edge metrics in a directed graph:
+ * Compute the following edge metrics in a directed graph.
* - number of triangle triplets
* - number of rectangle triplets
* - maximum number of triangle triplets
@@ -59,7 +60,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class EdgeMetrics<K extends Comparable<K>, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Result> {
+extends GraphAnalyticBase<K, VV, EV, Result> {
private static final String TRIANGLE_TRIPLET_COUNT = "triangleTripletCount";
@@ -88,11 +89,11 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
/*
* Implementation notes:
*
- * Use aggregator to replace SumEdgeStats when aggregators are rewritten to use
- * a hash-combineable hashable-reduce.
+ * <p>Use aggregator to replace SumEdgeStats when aggregators are rewritten to use
+ * a hash-combineable hashable-reduce.
*
- * Use distinct to replace ReduceEdgeStats when the combiner can be disabled
- * with a sorted-reduce forced.
+ * <p>Use distinct to replace ReduceEdgeStats when the combiner can be disabled
+ * with a sorted-reduce forced.
*/
@Override
@@ -147,7 +148,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
* the target vertex ID, the source degrees, and the low-order count. The
* second tuple is the same with the source and target roles reversed.
*
- * The low-order count is one if the source vertex degree is less than the
+ * <p>The low-order count is one if the source vertex degree is less than the
* target vertex degree or if the degrees are equal and the source vertex
* ID compares lower than the target vertex ID; otherwise the low-order
* count is zero.
@@ -346,11 +347,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
@Override
public boolean equals(Object obj) {
- if (obj == null) { return false; }
- if (obj == this) { return true; }
- if (obj.getClass() != getClass()) { return false; }
+ if (obj == null) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
- Result rhs = (Result)obj;
+ Result rhs = (Result) obj;
return new EqualsBuilder()
.append(triangleTripletCount, rhs.triangleTripletCount)
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
index 9764f6b..3931f65 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
@@ -18,27 +18,28 @@
package org.apache.flink.graph.library.metric.directed;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.accumulators.LongMaximum;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.AbstractGraphAnalytic;
import org.apache.flink.graph.AnalyticHelper;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalyticBase;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.library.metric.directed.VertexMetrics.Result;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
import java.io.IOException;
import java.text.NumberFormat;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
- * Compute the following vertex metrics in a directed graph:
+ * Compute the following vertex metrics in a directed graph.
* - number of vertices
* - number of edges
* - number of unidirectional edges
@@ -55,7 +56,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class VertexMetrics<K extends Comparable<K>, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Result> {
+extends GraphAnalyticBase<K, VV, EV, Result> {
private static final String VERTEX_COUNT = "vertexCount";
@@ -255,25 +256,25 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
/**
* Get the average degree, the average number of in- plus out-edges per vertex.
*
- * A result of {@code Float.NaN} is returned for an empty graph for
+ * <p>A result of {@code Float.NaN} is returned for an empty graph for
* which both the number of edges and number of vertices is zero.
*
* @return average degree
*/
public double getAverageDegree() {
- return vertexCount == 0 ? Double.NaN : getNumberOfEdges() / (double)vertexCount;
+ return vertexCount == 0 ? Double.NaN : getNumberOfEdges() / (double) vertexCount;
}
/**
* Get the density, the ratio of actual to potential edges between vertices.
*
- * A result of {@code Float.NaN} is returned for a graph with fewer than
+ * <p>A result of {@code Float.NaN} is returned for a graph with fewer than
* two vertices for which the number of edges is zero.
*
* @return density
*/
public double getDensity() {
- return vertexCount <= 1 ? Double.NaN : getNumberOfEdges() / (double)(vertexCount*(vertexCount-1));
+ return vertexCount <= 1 ? Double.NaN : getNumberOfEdges() / (double) (vertexCount * (vertexCount - 1));
}
/**
@@ -358,11 +359,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
@Override
public boolean equals(Object obj) {
- if (obj == null) { return false; }
- if (obj == this) { return true; }
- if (obj.getClass() != getClass()) { return false; }
+ if (obj == null) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
- Result rhs = (Result)obj;
+ Result rhs = (Result) obj;
return new EqualsBuilder()
.append(vertexCount, rhs.vertexCount)
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
index 31f01d8..d1c7fb7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -18,8 +18,6 @@
package org.apache.flink.graph.library.metric.undirected;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.accumulators.LongMaximum;
import org.apache.flink.api.common.functions.MapFunction;
@@ -28,22 +26,25 @@ import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.AbstractGraphAnalytic;
import org.apache.flink.graph.AnalyticHelper;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalyticBase;
import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair;
import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.library.metric.undirected.EdgeMetrics.Result;
import org.apache.flink.types.LongValue;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
import java.io.IOException;
import java.text.NumberFormat;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
- * Compute the following edge metrics in an undirected graph:
+ * Compute the following edge metrics in an undirected graph.
* - number of triangle triplets
* - number of rectangle triplets
* - maximum number of triangle triplets
@@ -54,7 +55,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class EdgeMetrics<K extends Comparable<K>, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Result> {
+extends GraphAnalyticBase<K, VV, EV, Result> {
private static final String TRIANGLE_TRIPLET_COUNT = "triangleTripletCount";
@@ -101,8 +102,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
/*
* Implementation notes:
*
- * Use aggregator to replace SumEdgeStats when aggregators are rewritten to use
- * a hash-combineable hashed-reduce.
+ * <p>Use aggregator to replace SumEdgeStats when aggregators are rewritten to use
+ * a hash-combineable hashed-reduce.
*/
@Override
@@ -319,11 +320,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
@Override
public boolean equals(Object obj) {
- if (obj == null) { return false; }
- if (obj == this) { return true; }
- if (obj.getClass() != getClass()) { return false; }
+ if (obj == null) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
- Result rhs = (Result)obj;
+ Result rhs = (Result) obj;
return new EqualsBuilder()
.append(triangleTripletCount, rhs.triangleTripletCount)
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
index dd2411e..000f4e0 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -18,27 +18,28 @@
package org.apache.flink.graph.library.metric.undirected;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.accumulators.LongMaximum;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.AbstractGraphAnalytic;
import org.apache.flink.graph.AnalyticHelper;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalyticBase;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.library.metric.undirected.VertexMetrics.Result;
import org.apache.flink.types.LongValue;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
import java.io.IOException;
import java.text.NumberFormat;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
- * Compute the following vertex metrics in an undirected graph:
+ * Compute the following vertex metrics in an undirected graph.
* - number of vertices
* - number of edges
* - average degree
@@ -51,7 +52,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class VertexMetrics<K extends Comparable<K>, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Result> {
+extends GraphAnalyticBase<K, VV, EV, Result> {
private static final String VERTEX_COUNT = "vertexCount";
@@ -222,26 +223,26 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
/**
* Get the average degree, the average number of edges per vertex.
*
- * A result of {@code Float.NaN} is returned for an empty graph for
+ * <p>A result of {@code Float.NaN} is returned for an empty graph for
* which both the number of edges and number of vertices is zero.
*
* @return average degree
*/
public double getAverageDegree() {
// each edge is incident on two vertices
- return vertexCount == 0 ? Double.NaN : 2 * edgeCount / (double)vertexCount;
+ return vertexCount == 0 ? Double.NaN : 2 * edgeCount / (double) vertexCount;
}
/**
* Get the density, the ratio of actual to potential edges between vertices.
*
- * A result of {@code Float.NaN} is returned for a graph with fewer than
+ * <p>A result of {@code Float.NaN} is returned for a graph with fewer than
* two vertices for which the number of edges is zero.
*
* @return density
*/
public double getDensity() {
- return vertexCount <= 1 ? Double.NaN : edgeCount / (double)(vertexCount*(vertexCount-1)/2);
+ return vertexCount <= 1 ? Double.NaN : edgeCount / (double) (vertexCount * (vertexCount - 1) / 2);
}
/**
@@ -301,11 +302,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
@Override
public boolean equals(Object obj) {
- if (obj == null) { return false; }
- if (obj == this) { return true; }
- if (obj.getClass() != getClass()) { return false; }
+ if (obj == null) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
- Result rhs = (Result)obj;
+ Result rhs = (Result) obj;
return new EqualsBuilder()
.append(vertexCount, rhs.vertexCount)
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index 7df3235..e1bda93 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -37,7 +37,7 @@ import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
import org.apache.flink.graph.asm.result.BinaryResult;
import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
-import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.MurmurHash;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.types.FloatValue;
@@ -54,17 +54,17 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
* http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf
- * <p>
- * Adamic-Adar measures the similarity between pairs of vertices as the sum of
+ *
+ * <p>Adamic-Adar measures the similarity between pairs of vertices as the sum of
* the inverse logarithm of degree over shared neighbors. Scores are non-negative
* and unbounded. A vertex with higher degree has greater overall influence but
* is less influential to each pair of neighbors.
- * <p>
- * This implementation produces similarity scores for each pair of vertices
+ *
+ * <p>This implementation produces similarity scores for each pair of vertices
* in the graph with at least one shared neighbor; equivalently, this is the
* set of all non-zero Adamic-Adar coefficients.
- * <p>
- * The input graph must be a simple, undirected graph containing no duplicate
+ *
+ * <p>The input graph must be a simple, undirected graph containing no duplicate
* edges or self-loops.
*
* @param <K> graph ID type
@@ -137,7 +137,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
- if (! AdamicAdar.class.isAssignableFrom(other.getClass())) {
+ if (!AdamicAdar.class.isAssignableFrom(other.getClass())) {
return false;
}
@@ -254,7 +254,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
long degree = value.f1.getValue();
// when the degree is one the logarithm is zero so avoid dividing by this value
- float inverseLogDegree = (degree == 1) ? 0.0f : 1.0f / (float)Math.log(value.f1.getValue());
+ float inverseLogDegree = (degree == 1) ? 0.0f : 1.0f / (float) Math.log(value.f1.getValue());
output.f2.setValue(inverseLogDegree);
return output;
@@ -266,7 +266,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
*
* @param <T> ID type
*/
- @ForwardedFields("0->1; 1->2 ; 2->3")
+ @ForwardedFields("0->1; 1->2; 2->3")
private static class GenerateGroupSpans<T>
implements GroupReduceFunction<Tuple3<T, T, FloatValue>, Tuple4<IntValue, T, T, FloatValue>> {
private IntValue groupSpansValue = new IntValue();
@@ -309,7 +309,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
throws Exception {
int spans = value.f0.getValue();
- for (int idx = 0 ; idx < spans ; idx++ ) {
+ for (int idx = 0; idx < spans; idx++) {
value.f0.setValue(idx);
out.collect(value);
}
@@ -339,16 +339,16 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
output.f1 = edge.f2;
output.f2 = edge.f3;
- for (int i = 0 ; i < visitedCount ; i++) {
+ for (int i = 0; i < visitedCount; i++) {
output.f0 = visited.get(i);
out.collect(output);
}
if (visitedCount < GROUP_SIZE) {
- if (! initialized) {
+ if (!initialized) {
initialized = true;
- for (int i = 0 ; i < GROUP_SIZE ; i++) {
+ for (int i = 0; i < GROUP_SIZE; i++) {
visited.add(edge.f2.copy());
}
} else {
@@ -451,7 +451,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
implements PrintableResult, BinaryResult<T>, Comparable<Result<T>> {
public static final int HASH_SEED = 0xe405f6d1;
- private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
+ private MurmurHash hasher = new MurmurHash(HASH_SEED);
/**
* No-args constructor.
@@ -482,7 +482,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
/**
* Get the Adamic-Adar score, equal to the sum over common neighbors of
- * the inverse logarithm of degree
+ * the inverse logarithm of degree.
*
* @return Adamic-Adar score
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index b3e69f1..35217c6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -32,7 +32,7 @@ import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
import org.apache.flink.graph.asm.result.BinaryResult;
import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
-import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.MurmurHash;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.types.IntValue;
@@ -50,12 +50,12 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* is computed as the number of shared neighbors divided by the number of
* distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all
* neighbors are shared).
- * <p>
- * This implementation produces similarity scores for each pair of vertices
+ *
+ * <p>This implementation produces similarity scores for each pair of vertices
* in the graph with at least one shared neighbor; equivalently, this is the
* set of all non-zero Jaccard Similarity coefficients.
- * <p>
- * The input graph must be a simple, undirected graph containing no duplicate
+ *
+ * <p>The input graph must be a simple, undirected graph containing no duplicate
* edges or self-loops.
*
* @param <K> graph ID type
@@ -89,7 +89,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
* pairs. Small groups generate more data whereas large groups distribute
* computation less evenly among tasks.
*
- * The default value should be near-optimal for all use cases.
+ * <p>The default value should be near-optimal for all use cases.
*
* @param groupSize the group size for the quadratic expansion of neighbor pairs
* @return this
@@ -180,7 +180,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
- if (! JaccardIndex.class.isAssignableFrom(other.getClass())) {
+ if (!JaccardIndex.class.isAssignableFrom(other.getClass())) {
return false;
}
@@ -267,11 +267,11 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
* This is the first of three operations implementing a self-join to generate
* the full neighbor pairing for each vertex. The number of neighbor pairs
* is (n choose 2) which is quadratic in the vertex degree.
- * <p>
- * The third operation, {@link GenerateGroupPairs}, processes groups of size
+ *
+ * <p>The third operation, {@link GenerateGroupPairs}, processes groups of size
* {@link #groupSize} and emits {@code O(groupSize * deg(vertex))} pairs.
- * <p>
- * This input to the third operation is still quadratic in the vertex degree.
+ *
+ * <p>This input to the third operation is still quadratic in the vertex degree.
* Two prior operations, {@link GenerateGroupSpans} and {@link GenerateGroups},
* each emit datasets linear in the vertex degree, with a forced rebalance
* in between. {@link GenerateGroupSpans} first annotates each edge with the
@@ -310,7 +310,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
// group span, u, v, d(v)
output.f1 = edge.f0;
output.f2 = edge.f1;
- output.f3.setValue((int)degree);
+ output.f3.setValue((int) degree);
out.collect(output);
@@ -337,7 +337,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
throws Exception {
int spans = value.f0.getValue();
- for (int idx = 0 ; idx < spans ; idx++ ) {
+ for (int idx = 0; idx < spans; idx++) {
value.f0.setValue(idx);
out.collect(value);
}
@@ -346,8 +346,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
/**
* Emits the two-path for all neighbor pairs in this group.
- * <p>
- * The first {@link #groupSize} vertices are emitted pairwise. Following
+ *
+ * <p>The first {@link #groupSize} vertices are emitted pairwise. Following
* vertices are only paired with vertices from this initial group.
*
* @param <T> ID type
@@ -373,7 +373,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
int visitedCount = 0;
for (Tuple4<IntValue, T, T, IntValue> edge : values) {
- for (int i = 0 ; i < visitedCount ; i++) {
+ for (int i = 0; i < visitedCount; i++) {
Tuple3<T, T, IntValue> prior = visited.get(i);
prior.f1 = edge.f2;
@@ -384,7 +384,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
if (degreeSum > Integer.MAX_VALUE) {
throw new RuntimeException("Degree sum overflows IntValue");
}
- prior.f2.setValue((int)degreeSum);
+ prior.f2.setValue((int) degreeSum);
// v, w, d(v) + d(w)
out.collect(prior);
@@ -393,10 +393,10 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
}
if (visitedCount < groupSize) {
- if (! initialized) {
+ if (!initialized) {
initialized = true;
- for (int i = 0 ; i < groupSize ; i++) {
+ for (int i = 0; i < groupSize; i++) {
Tuple3<T, T, IntValue> tuple = new Tuple3<>();
tuple.f0 = edge.f2.copy();
@@ -506,7 +506,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
implements PrintableResult, BinaryResult<T>, Comparable<Result<T>> {
public static final int HASH_SEED = 0x731f73e7;
- private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
+ private MurmurHash hasher = new MurmurHash(HASH_SEED);
public Result() {
f2 = new IntValue();
@@ -584,8 +584,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
// exact comparison of a/b with x/y using only integer math:
// a/b <?> x/y == a*y <?> b*x
- long ay = getSharedNeighborCount().getValue() * (long)o.getDistinctNeighborCount().getValue();
- long bx = getDistinctNeighborCount().getValue() * (long)o.getSharedNeighborCount().getValue();
+ long ay = getSharedNeighborCount().getValue() * (long) o.getDistinctNeighborCount().getValue();
+ long bx = getDistinctNeighborCount().getValue() * (long) o.getSharedNeighborCount().getValue();
return Long.compare(ay, bx);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
index af25377..1f2d832 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
@@ -35,7 +35,7 @@ import java.util.Iterator;
/**
* The base class for the message-passing functions between vertices as a part of a {@link VertexCentricIteration}.
- *
+ *
* @param <K> The type of the vertex key (the vertex identifier).
* @param <VV> The type of the vertex value (the state of the vertex).
* @param <EV> The type of the values that are associated with the edges.
@@ -53,10 +53,10 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
* This method is invoked once per superstep, for each active vertex.
* A vertex is active during a superstep, if at least one message was produced for it,
* in the previous superstep. During the first superstep, all vertices are active.
- * <p>
- * This method can iterate over all received messages, set the new vertex value, and
+ *
+ * <p>This method can iterate over all received messages, set the new vertex value, and
* send messages to other vertices (which will be delivered in the next superstep).
- *
+ *
* @param vertex The vertex executing this function
* @param messages The messages that were sent to this vertex in the previous superstep
* @throws Exception
@@ -65,23 +65,22 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
/**
* This method is executed once per superstep before the vertex update function is invoked for each vertex.
- *
+ *
* @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
*/
public void preSuperstep() throws Exception {}
-
+
/**
* This method is executed once per superstep after the vertex update function has been invoked for each vertex.
- *
+ *
* @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
*/
public void postSuperstep() throws Exception {}
-
-
+
/**
* Gets an {@link java.lang.Iterable} with all out-going edges. This method is mutually exclusive with
* {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
- *
+ *
* @return An iterator with all edges.
*/
public final Iterable<Edge<K, EV>> getEdges() {
@@ -93,7 +92,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
/**
* Sends the given message to all vertices that adjacent to the changed vertex.
* This method is mutually exclusive to the method {@link #getEdges()} and may be called only once.
- *
+ *
* @param m The message to send.
*/
public final void sendMessageToAllNeighbors(Message m) {
@@ -105,11 +104,11 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
out.collect(Either.Right(outMsg));
}
}
-
+
/**
* Sends the given message to the vertex identified by the given key. If the target vertex does not exist,
* the next superstep will cause an exception due to a non-deliverable message.
- *
+ *
* @param target The key (id) of the target vertex to message.
* @param m The message.
*/
@@ -124,12 +123,12 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
/**
* Sets the new value of this vertex.
*
- * This should be called at most once per ComputeFunction.
- *
+ * <p>This should be called at most once per ComputeFunction.
+ *
* @param newValue The new vertex value.
*/
public final void setNewVertexValue(VV newValue) {
- if(setNewVertexValueCalled) {
+ if (setNewVertexValueCalled) {
throw new IllegalStateException("setNewVertexValue should only be called at most once per updateVertex");
}
setNewVertexValueCalled = true;
@@ -138,43 +137,44 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
out.collect(Either.Left(outVertex));
}
+
// --------------------------------------------------------------------------------------------
-
+
/**
* Gets the number of the superstep, starting at <tt>1</tt>.
- *
+ *
* @return The number of the current superstep.
*/
public final int getSuperstepNumber() {
return this.runtimeContext.getSuperstepNumber();
}
-
+
/**
* Gets the iteration aggregator registered under the given name. The iteration aggregator combines
* all aggregates globally once per superstep and makes them available in the next superstep.
- *
+ *
* @param name The name of the aggregator.
* @return The aggregator registered under this name, or {@code null}, if no aggregator was registered.
*/
public final <T extends Aggregator<?>> T getIterationAggregator(String name) {
return this.runtimeContext.getIterationAggregator(name);
}
-
+
/**
* Get the aggregated value that an aggregator computed in the previous iteration.
- *
+ *
* @param name The name of the aggregator.
* @return The aggregated value of the previous iteration.
*/
public final <T extends Value> T getPreviousIterationAggregate(String name) {
return this.runtimeContext.getPreviousIterationAggregate(name);
}
-
+
/**
* Gets the broadcast data set registered under the given name. Broadcast data sets
* are available on all parallel instances of a function. They can be registered via
* {@link org.apache.flink.graph.pregel.VertexCentricConfiguration#addBroadcastSet(String, DataSet)}.
- *
+ *
* @param name The name under which the broadcast set is registered.
* @return The broadcast data set.
*/
@@ -189,26 +189,26 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
private Vertex<K, VV> outVertex;
private Tuple2<K, Message> outMsg;
-
+
private IterationRuntimeContext runtimeContext;
-
+
private Iterator<Edge<K, EV>> edges;
-
+
private Collector<Either<?, ?>> out;
-
+
private EdgesIterator<K, EV> edgeIterator;
-
+
private boolean edgesUsed;
private boolean setNewVertexValueCalled;
-
+
void init(IterationRuntimeContext context) {
this.runtimeContext = context;
this.outVertex = new Vertex<>();
this.outMsg = new Tuple2<>();
this.edgeIterator = new EdgesIterator<>();
}
-
+
@SuppressWarnings("unchecked")
void set(K vertexId, Iterator<Edge<K, EV>> edges,
Collector<Either<Vertex<K, VV>, Tuple2<K, Message>>> out) {
@@ -228,17 +228,17 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
edgesUsed = true;
}
- private static final class EdgesIterator<K, EV>
- implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>>
- {
+ private static final class EdgesIterator<K, EV>
+ implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>> {
+
private Iterator<Edge<K, EV>> input;
-
+
private Edge<K, EV> edge = new Edge<>();
void set(Iterator<Edge<K, EV>> input) {
this.input = input;
}
-
+
@Override
public boolean hasNext() {
return input.hasNext();
@@ -257,6 +257,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
public void remove() {
throw new UnsupportedOperationException();
}
+
@Override
public Iterator<Edge<K, EV>> iterator() {
return this;
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
index 6e51a3a..204bc9a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
@@ -27,7 +27,7 @@ import java.io.Serializable;
/**
* The base class for combining messages sent during a {@link VertexCentricIteration}.
- *
+ *
* @param <K> The type of the vertex id
* @param <Message> The type of the message sent between vertices along the edges.
*/
@@ -49,7 +49,7 @@ public abstract class MessageCombiner<K, Message> implements Serializable {
* Combines messages sent from different vertices to a target vertex.
* Implementing this method might reduce communication costs during a vertex-centric
* iteration.
- *
+ *
* @param messages the input messages to combine
* @throws Exception
*/
@@ -57,7 +57,7 @@ public abstract class MessageCombiner<K, Message> implements Serializable {
/**
* Sends the combined message to the target vertex.
- *
+ *
* @param combinedMessage
* @throws Exception
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java
index 5a17b90..f8dd926 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java
@@ -18,12 +18,12 @@
package org.apache.flink.graph.pregel;
-import java.util.Iterator;
-
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.Either;
import org.apache.flink.types.NullValue;
+import java.util.Iterator;
+
/**
* An iterator that returns messages. The iterator is {@link java.lang.Iterable} at the same time to support
* the <i>foreach</i> syntax.
@@ -33,17 +33,17 @@ public final class MessageIterator<Message> implements Iterator<Message>, Iterab
private transient Iterator<Tuple2<?, Either<NullValue, Message>>> source;
private Message first = null;
-
- final void setSource(Iterator<Tuple2<?, Either<NullValue, Message>>> source) {
+
+ void setSource(Iterator<Tuple2<?, Either<NullValue, Message>>> source) {
this.source = source;
}
- final void setFirst(Message msg) {
+ void setFirst(Message msg) {
this.first = msg;
}
-
+
@Override
- public final boolean hasNext() {
+ public boolean hasNext() {
if (first != null) {
return true;
}
@@ -51,9 +51,9 @@ public final class MessageIterator<Message> implements Iterator<Message>, Iterab
return ((this.source != null) && (this.source.hasNext()));
}
}
-
+
@Override
- public final Message next() {
+ public Message next() {
if (first != null) {
Message toReturn = first;
first = null;
@@ -63,7 +63,7 @@ public final class MessageIterator<Message> implements Iterator<Message>, Iterab
}
@Override
- public final void remove() {
+ public void remove() {
throw new UnsupportedOperationException();
}
@@ -71,4 +71,4 @@ public final class MessageIterator<Message> implements Iterator<Message>, Iterab
public Iterator<Message> iterator() {
return this;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
index a0f793a..69fcc52 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
@@ -30,13 +30,13 @@ import java.util.List;
* degree of parallelism, to register aggregators and use broadcast sets in
* the {@link org.apache.flink.graph.pregel.ComputeFunction}.
*
- * The VertexCentricConfiguration object is passed as an argument to
+ * <p>The VertexCentricConfiguration object is passed as an argument to
* {@link org.apache.flink.graph.Graph#runVertexCentricIteration (
* org.apache.flink.graph.pregel.ComputeFunction, int, VertexCentricConfiguration)}.
*/
public class VertexCentricConfiguration extends IterationConfiguration {
- /** the broadcast variables for the compute function **/
+ /** The broadcast variables for the compute function. **/
private List<Tuple2<String, DataSet<?>>> bcVars = new ArrayList<>();
public VertexCentricConfiguration() {}