You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/26 19:16:17 UTC
[08/15] flink git commit: [FLINK-6709] [gelly] Activate strict
checkstyle for flink-gellies
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
index 5f28605..949dd4c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
@@ -18,12 +18,10 @@
package org.apache.flink.graph.library.clustering.directed;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.graph.AbstractGraphAnalytic;
import org.apache.flink.graph.AnalyticHelper;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalyticBase;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
@@ -32,6 +30,9 @@ import org.apache.flink.graph.library.clustering.directed.TriadicCensus.Result;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.util.Preconditions;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
import java.io.IOException;
import java.math.BigInteger;
import java.text.NumberFormat;
@@ -41,15 +42,15 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
* A triad is formed by three connected or unconnected vertices in a graph.
* The triadic census counts the occurrences of each type of triad.
- * <p>
- * http://vlado.fmf.uni-lj.si/pub/networks/doc/triads/triads.pdf
+ *
+ * <p>See http://vlado.fmf.uni-lj.si/pub/networks/doc/triads/triads.pdf
*
* @param <K> graph ID type
* @param <VV> vertex value type
* @param <EV> edge value type
*/
public class TriadicCensus<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Result> {
+extends GraphAnalyticBase<K, VV, EV, Result> {
private TriangleListingHelper<K> triangleListingHelper;
@@ -101,24 +102,24 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
BigInteger three = BigInteger.valueOf(3);
BigInteger six = BigInteger.valueOf(6);
- BigInteger vertexCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "vc"));
- BigInteger unidirectionalEdgeCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "uec") / 2);
- BigInteger bidirectionalEdgeCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "bec") / 2);
- BigInteger triplet021dCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "021d"));
- BigInteger triplet021uCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "021u"));
- BigInteger triplet021cCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "021c"));
- BigInteger triplet111dCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "111d"));
- BigInteger triplet111uCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "111u"));
- BigInteger triplet201Count = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "201"));
+ BigInteger vertexCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "vc"));
+ BigInteger unidirectionalEdgeCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "uec") / 2);
+ BigInteger bidirectionalEdgeCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "bec") / 2);
+ BigInteger triplet021dCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "021d"));
+ BigInteger triplet021uCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "021u"));
+ BigInteger triplet021cCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "021c"));
+ BigInteger triplet111dCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "111d"));
+ BigInteger triplet111uCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "111u"));
+ BigInteger triplet201Count = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "201"));
// triads with three connecting edges = closed triplet = triangle
- BigInteger triangle030tCount = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "030t"));
- BigInteger triangle030cCount = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "030c"));
- BigInteger triangle120dCount = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "120d"));
- BigInteger triangle120uCount = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "120u"));
- BigInteger triangle120cCount = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "120c"));
- BigInteger triangle210Count = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "210"));
- BigInteger triangle300Count = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "300"));
+ BigInteger triangle030tCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "030t"));
+ BigInteger triangle030cCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "030c"));
+ BigInteger triangle120dCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "120d"));
+ BigInteger triangle120uCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "120u"));
+ BigInteger triangle120cCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "120c"));
+ BigInteger triangle210Count = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "210"));
+ BigInteger triangle300Count = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "300"));
// triads with two connecting edges = open triplet;
// each triangle deducts the count of three triplets
@@ -236,7 +237,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
long triangle210Count = 0;
long triangle300tCount = 0;
- for (int i = 0 ; i < typeTable.length ; i++) {
+ for (int i = 0; i < typeTable.length; i++) {
if (typeTable[i] == 9) {
triangle030tCount += triangleCount[i];
} else if (typeTable[i] == 10) {
@@ -509,7 +510,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
/**
* Get the array of counts.
*
- * The order of the counts is from least to most connected:
+ * <p>The order of the counts is from least to most connected:
* 003, 012, 102, 021d, 021u, 021c, 111d, 111u,
* 030t, 030c, 201, 120d, 120u, 120c, 210, 300
*
@@ -550,11 +551,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
@Override
public boolean equals(Object obj) {
- if (obj == null) { return false; }
- if (obj == this) { return true; }
- if (obj.getClass() != getClass()) { return false; }
+ if (obj == null) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
- Result rhs = (Result)obj;
+ Result rhs = (Result) obj;
return new EqualsBuilder()
.append(counts, rhs.counts)
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index 582c4b5..38b0746 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -53,13 +53,13 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
* Generates a listing of distinct triangles from the input graph.
- * <p>
- * A triangle is a 3-clique with vertices A, B, and C connected by edges
+ *
+ * <p>A triangle is a 3-clique with vertices A, B, and C connected by edges
* (A, B), (A, C), and (B, C).
- * <p>
- * The input graph must not contain duplicate edges or self-loops.
- * <p>
- * This algorithm is similar to the undirected version but also tracks and
+ *
+ * <p>The input graph must not contain duplicate edges or self-loops.
+ *
+ * <p>This algorithm is similar to the undirected version but also tracks and
* computes a bitmask representing the six potential graph edges connecting
* the triangle vertices.
*
@@ -112,7 +112,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
- if (! TriangleListing.class.isAssignableFrom(other.getClass())) {
+ if (!TriangleListing.class.isAssignableFrom(other.getClass())) {
return false;
}
@@ -258,9 +258,9 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
*/
private static final class OrderByDegree<T extends Comparable<T>, ET>
implements MapFunction<Edge<T, Tuple3<ET, Degrees, Degrees>>, Tuple3<T, T, ByteValue>> {
- private ByteValue forward = new ByteValue((byte)(EdgeOrder.FORWARD.getBitmask() << 2));
+ private ByteValue forward = new ByteValue((byte) (EdgeOrder.FORWARD.getBitmask() << 2));
- private ByteValue reverse = new ByteValue((byte)(EdgeOrder.REVERSE.getBitmask() << 2));
+ private ByteValue reverse = new ByteValue((byte) (EdgeOrder.REVERSE.getBitmask() << 2));
private Tuple3<T, T, ByteValue> output = new Tuple3<>();
@@ -319,17 +319,17 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
Tuple2<T, ByteValue> previous = visited.get(i);
output.f1 = previous.f0;
- output.f3.setValue((byte)(previous.f1.getValue() | bitmask));
+ output.f3.setValue((byte) (previous.f1.getValue() | bitmask));
// u, v, w, bitmask
out.collect(output);
}
- if (! iter.hasNext()) {
+ if (!iter.hasNext()) {
break;
}
- byte shiftedBitmask = (byte)(bitmask << 2);
+ byte shiftedBitmask = (byte) (bitmask << 2);
if (visitedCount == visited.size()) {
visited.add(new Tuple2<>(edge.f1.copy(), new ByteValue(shiftedBitmask)));
@@ -361,7 +361,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
output.f0 = triplet.f0;
output.f1 = triplet.f1;
output.f2 = triplet.f2;
- output.f3.setValue((byte)(triplet.f3.getValue() | edge.f2.getValue()));
+ output.f3.setValue((byte) (triplet.f3.getValue() | edge.f2.getValue()));
return output;
}
}
@@ -381,26 +381,26 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
if (value.f0.compareTo(value.f1) > 0) {
byte bitmask = value.f3.getValue();
- T temp_val = value.f0;
+ T tempVal = value.f0;
value.f0 = value.f1;
- if (temp_val.compareTo(value.f2) < 0) {
- value.f1 = temp_val;
+ if (tempVal.compareTo(value.f2) < 0) {
+ value.f1 = tempVal;
int f0f1 = ((bitmask & 0b100000) >>> 1) | ((bitmask & 0b010000) << 1);
int f0f2 = (bitmask & 0b001100) >>> 2;
int f1f2 = (bitmask & 0b000011) << 2;
- value.f3.setValue((byte)(f0f1 | f0f2 | f1f2));
+ value.f3.setValue((byte) (f0f1 | f0f2 | f1f2));
} else {
value.f1 = value.f2;
- value.f2 = temp_val;
+ value.f2 = tempVal;
int f0f1 = (bitmask & 0b000011) << 4;
int f0f2 = ((bitmask & 0b100000) >>> 3) | ((bitmask & 0b010000) >>> 1);
int f1f2 = ((bitmask & 0b001000) >>> 3) | ((bitmask & 0b000100) >>> 1);
- value.f3.setValue((byte)(f0f1 | f0f2 | f1f2));
+ value.f3.setValue((byte) (f0f1 | f0f2 | f1f2));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
index e01892b..c2c2ad2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
@@ -18,18 +18,19 @@
package org.apache.flink.graph.library.clustering.undirected;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.flink.api.common.accumulators.DoubleCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.AbstractGraphAnalytic;
import org.apache.flink.graph.AnalyticHelper;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalyticBase;
import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient.Result;
import org.apache.flink.types.CopyableValue;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
import java.io.IOException;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -43,7 +44,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class AverageClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Result> {
+extends GraphAnalyticBase<K, VV, EV, Result> {
private static final String VERTEX_COUNT = "vertexCount";
@@ -181,11 +182,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
@Override
public boolean equals(Object obj) {
- if (obj == null) { return false; }
- if (obj == this) { return true; }
- if (obj.getClass() != getClass()) { return false; }
+ if (obj == null) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
- Result rhs = (Result)obj;
+ Result rhs = (Result) obj;
return new EqualsBuilder()
.append(vertexCount, rhs.vertexCount)
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
index 2eac620..5377c1f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
@@ -18,17 +18,18 @@
package org.apache.flink.graph.library.clustering.undirected;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.AbstractGraphAnalytic;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalyticBase;
import org.apache.flink.graph.asm.dataset.Count;
import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient.Result;
import org.apache.flink.graph.library.metric.undirected.VertexMetrics;
import org.apache.flink.types.CopyableValue;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
@@ -40,7 +41,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class GlobalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Result> {
+extends GraphAnalyticBase<K, VV, EV, Result> {
private Count<TriangleListing.Result<K>> triangleCount;
@@ -141,13 +142,13 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
* number of closed triplets (triangles) divided by the total number of
* triplets.
*
- * A score of {@code Double.NaN} is returned for a graph of isolated vertices
+ * <p>A score of {@code Double.NaN} is returned for a graph of isolated vertices
* for which both the triangle count and number of neighbors are zero.
*
* @return global clustering coefficient score
*/
public double getGlobalClusteringCoefficientScore() {
- return (tripletCount == 0) ? Double.NaN : triangleCount / (double)tripletCount;
+ return (tripletCount == 0) ? Double.NaN : triangleCount / (double) tripletCount;
}
@Override
@@ -167,11 +168,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
@Override
public boolean equals(Object obj) {
- if (obj == null) { return false; }
- if (obj == this) { return true; }
- if (obj.getClass() != getClass()) { return false; }
+ if (obj == null) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
- Result rhs = (Result)obj;
+ Result rhs = (Result) obj;
return new EqualsBuilder()
.append(tripletCount, rhs.tripletCount)
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 10f7aba..e94310f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -34,7 +34,7 @@ import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.asm.result.UnaryResult;
import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
-import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.MurmurHash;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.CopyableValue;
@@ -48,12 +48,12 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* The local clustering coefficient measures the connectedness of each vertex's
* neighborhood. Scores range from 0.0 (no edges between neighbors) to 1.0
* (neighborhood is a clique).
- * <p>
- * An edge between a vertex's neighbors is a triangle. Counting edges between
+ *
+ * <p>An edge between a vertex's neighbors is a triangle. Counting edges between
* neighbors is equivalent to counting the number of triangles which include
* the vertex.
- * <p>
- * The input graph must be a simple, undirected graph containing no duplicate
+ *
+ * <p>The input graph must be a simple, undirected graph containing no duplicate
* edges or self-loops.
*
* @param <K> graph ID type
@@ -107,7 +107,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
- if (! LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
+ if (!LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
return false;
}
@@ -247,7 +247,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
implements PrintableResult, UnaryResult<T> {
private static final int HASH_SEED = 0xc23937c1;
- private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
+ private MurmurHash hasher = new MurmurHash(HASH_SEED);
@Override
public T getVertexId0() {
@@ -283,7 +283,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
* number of edges between neighbors, equal to the triangle count,
* divided by the number of potential edges between neighbors.
*
- * A score of {@code Double.NaN} is returned for a vertex with degree 1
+ * <p>A score of {@code Double.NaN} is returned for a vertex with degree 1
* for which both the triangle count and number of neighbors are zero.
*
* @return local clustering coefficient score
@@ -292,7 +292,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
long degree = getDegree().getValue();
long neighborPairs = degree * (degree - 1) / 2;
- return (neighborPairs == 0) ? Double.NaN : getTriangleCount().getValue() / (double)neighborPairs;
+ return (neighborPairs == 0) ? Double.NaN : getTriangleCount().getValue() / (double) neighborPairs;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
index 604621d..c5a323d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
@@ -18,11 +18,9 @@
package org.apache.flink.graph.library.clustering.undirected;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.AbstractGraphAnalytic;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalyticBase;
import org.apache.flink.graph.asm.dataset.Count;
import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.library.clustering.undirected.TriadicCensus.Result;
@@ -30,6 +28,9 @@ import org.apache.flink.graph.library.metric.undirected.VertexMetrics;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.util.Preconditions;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
import java.math.BigInteger;
import java.text.NumberFormat;
@@ -38,18 +39,18 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
* A triad is formed by three connected or unconnected vertices in a graph.
* The triadic census counts the occurrences of each type of triad.
- * <p>
- * The four types of undirected triads are formed with 0, 1, 2, or 3
+ *
+ * <p>The four types of undirected triads are formed with 0, 1, 2, or 3
* connecting edges.
- * <p>
- * http://vlado.fmf.uni-lj.si/pub/networks/doc/triads/triads.pdf
+ *
+ * <p>See http://vlado.fmf.uni-lj.si/pub/networks/doc/triads/triads.pdf
*
* @param <K> graph ID type
* @param <VV> vertex value type
* @param <EV> edge value type
*/
public class TriadicCensus<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Result> {
+extends GraphAnalyticBase<K, VV, EV, Result> {
private Count<TriangleListing.Result<K>> triangleCount;
@@ -203,7 +204,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
/**
* Get the array of counts.
*
- * The order of the counts is from least to most connected:
+ * <p>The order of the counts is from least to most connected:
* 03, 12, 21, 30
*
* @return array of counts
@@ -231,11 +232,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
@Override
public boolean equals(Object obj) {
- if (obj == null) { return false; }
- if (obj == this) { return true; }
- if (obj.getClass() != getClass()) { return false; }
+ if (obj == null) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
- Result rhs = (Result)obj;
+ Result rhs = (Result) obj;
return new EqualsBuilder()
.append(counts, rhs.counts)
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index ee8dbaf..b281473 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -33,9 +33,9 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair;
-import org.apache.flink.graph.library.clustering.undirected.TriangleListing.Result;
import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.asm.result.TertiaryResult;
+import org.apache.flink.graph.library.clustering.undirected.TriangleListing.Result;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.CopyableValue;
@@ -51,16 +51,17 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
* Generates a listing of distinct triangles from the input graph.
- * <p>
- * A triangle is a 3-cycle with vertices A, B, and C connected by edges
+ *
+ * <p>A triangle is a 3-cycle with vertices A, B, and C connected by edges
* (A, B), (A, C), and (B, C).
- * <p>
- * The input graph must be a simple, undirected graph containing no duplicate
+ *
+ * <p>The input graph must be a simple, undirected graph containing no duplicate
* edges or self-loops.
- * <p>
- * Algorithm from "Finding, Counting and Listing all Triangles in Large Graphs,
+ *
+ * <p>Algorithm from "Finding, Counting and Listing all Triangles in Large Graphs,
* An Experimental Study", Thomas Schank and Dorothea Wagner.
- * http://i11www.iti.uni-karlsruhe.de/extra/publications/sw-fclt-05_t.pdf
+ *
+ * <p>See http://i11www.iti.uni-karlsruhe.de/extra/publications/sw-fclt-05_t.pdf
*
* @param <K> graph ID type
* @param <VV> vertex value type
@@ -111,7 +112,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
- if (! TriangleListing.class.isAssignableFrom(other.getClass())) {
+ if (!TriangleListing.class.isAssignableFrom(other.getClass())) {
return false;
}
@@ -182,8 +183,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
/**
* Removes edge values while filtering such that only edges where the
* source vertex ID compares less than the target vertex ID are emitted.
- * <p>
- * Since the input graph is a simple graph this filter removes exactly half
+ *
+ * <p>Since the input graph is a simple graph this filter removes exactly half
* of the original edges.
*
* @param <T> ID type
@@ -210,8 +211,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
* vertex has lower degree are emitted. If the source and target vertex
* degrees are equal then the edge is emitted if the source vertex ID
* compares less than the target vertex ID.
- * <p>
- * Since the input graph is a simple graph this filter removes exactly half
+ *
+ * <p>Since the input graph is a simple graph this filter removes exactly half
* of the original edges.
*
* @param <T> ID type
@@ -270,7 +271,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
out.collect(output);
}
- if (! iter.hasNext()) {
+ if (!iter.hasNext()) {
break;
}
@@ -319,14 +320,14 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
throws Exception {
// by the triangle listing algorithm we know f1 < f2
if (value.f0.compareTo(value.f1) > 0) {
- T temp_val = value.f0;
+ T tempVal = value.f0;
value.f0 = value.f1;
- if (temp_val.compareTo(value.f2) <= 0) {
- value.f1 = temp_val;
+ if (tempVal.compareTo(value.f2) <= 0) {
+ value.f1 = tempVal;
} else {
value.f1 = value.f2;
- value.f2 = temp_val;
+ value.f2 = tempVal;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java
deleted file mode 100644
index a7d6ef1..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library.link_analysis;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.DoubleValue;
-
-class Functions {
-
- private Functions() {}
-
- /**
- * Sum vertices' scores.
- *
- * @param <T> ID type
- */
- @ForwardedFields("0")
- protected static final class SumScore<T>
- implements ReduceFunction<Tuple2<T, DoubleValue>> {
- @Override
- public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> left, Tuple2<T, DoubleValue> right)
- throws Exception {
- left.f1.setValue(left.f1.getValue() + right.f1.getValue());
- return left;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
deleted file mode 100644
index ba1ab21..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
+++ /dev/null
@@ -1,582 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library.link_analysis;
-
-import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
-import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.asm.result.PrintableResult;
-import org.apache.flink.graph.asm.result.UnaryResult;
-import org.apache.flink.graph.library.link_analysis.Functions.SumScore;
-import org.apache.flink.graph.library.link_analysis.HITS.Result;
-import org.apache.flink.graph.utils.Murmur3_32;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collection;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
-/**
- * Hyperlink-Induced Topic Search computes two interdependent scores for every
- * vertex in a directed graph. A good "hub" links to good "authorities" and
- * good "authorities" are linked from good "hubs".
- * <p>
- * This algorithm can be configured to terminate either by a limit on the number
- * of iterations, a convergence threshold, or both.
- * <p>
- * http://www.cs.cornell.edu/home/kleinber/auth.pdf
- *
- * @param <K> graph ID type
- * @param <VV> vertex value type
- * @param <EV> edge value type
- */
-public class HITS<K, VV, EV>
-extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
-
- private static final String CHANGE_IN_SCORES = "change in scores";
-
- private static final String HUBBINESS_SUM_SQUARED = "hubbiness sum squared";
-
- private static final String AUTHORITY_SUM_SQUARED = "authority sum squared";
-
- // Required configuration
- private int maxIterations;
-
- private double convergenceThreshold;
-
- // Optional configuration
- private int parallelism = PARALLELISM_DEFAULT;
-
- /**
- * Hyperlink-Induced Topic Search with a fixed number of iterations.
- *
- * @param iterations fixed number of iterations
- */
- public HITS(int iterations) {
- this(iterations, Double.MAX_VALUE);
- }
-
- /**
- * Hyperlink-Induced Topic Search with a convergence threshold. The algorithm
- * terminates when the total change in hub and authority scores over all
- * vertices falls to or below the given threshold value.
- *
- * @param convergenceThreshold convergence threshold for sum of scores
- */
- public HITS(double convergenceThreshold) {
- this(Integer.MAX_VALUE, convergenceThreshold);
- }
-
- /**
- * Hyperlink-Induced Topic Search with a convergence threshold and a maximum
- * iteration count. The algorithm terminates after either the given number
- * of iterations or when the total change in hub and authority scores over all
- * vertices falls to or below the given threshold value.
- *
- * @param maxIterations maximum number of iterations
- * @param convergenceThreshold convergence threshold for sum of scores
- */
- public HITS(int maxIterations, double convergenceThreshold) {
- Preconditions.checkArgument(maxIterations > 0, "Number of iterations must be greater than zero");
- Preconditions.checkArgument(convergenceThreshold > 0.0, "Convergence threshold must be greater than zero");
-
- this.maxIterations = maxIterations;
- this.convergenceThreshold = convergenceThreshold;
- }
-
- /**
- * Override the operator parallelism.
- *
- * @param parallelism operator parallelism
- * @return this
- */
- public HITS<K, VV, EV> setParallelism(int parallelism) {
- this.parallelism = parallelism;
-
- return this;
- }
-
- @Override
- protected String getAlgorithmName() {
- return HITS.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
- Preconditions.checkNotNull(other);
-
- if (! HITS.class.isAssignableFrom(other.getClass())) {
- return false;
- }
-
- HITS rhs = (HITS) other;
-
- // merge configurations
-
- maxIterations = Math.max(maxIterations, rhs.maxIterations);
- convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold);
- parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
- ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
- return true;
- }
-
- @Override
- public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
- throws Exception {
- DataSet<Tuple2<K, K>> edges = input
- .getEdges()
- .map(new ExtractEdgeIDs<K, EV>())
- .setParallelism(parallelism)
- .name("Extract edge IDs");
-
- // ID, hub, authority
- DataSet<Tuple3<K, DoubleValue, DoubleValue>> initialScores = edges
- .map(new InitializeScores<K>())
- .setParallelism(parallelism)
- .name("Initial scores")
- .groupBy(0)
- .reduce(new SumScores<K>())
- .setCombineHint(CombineHint.HASH)
- .setParallelism(parallelism)
- .name("Sum");
-
- IterativeDataSet<Tuple3<K, DoubleValue, DoubleValue>> iterative = initialScores
- .iterate(maxIterations);
-
- // ID, hubbiness
- DataSet<Tuple2<K, DoubleValue>> hubbiness = iterative
- .coGroup(edges)
- .where(0)
- .equalTo(1)
- .with(new Hubbiness<K>())
- .setParallelism(parallelism)
- .name("Hub")
- .groupBy(0)
- .reduce(new SumScore<K>())
- .setCombineHint(CombineHint.HASH)
- .setParallelism(parallelism)
- .name("Sum");
-
- // sum-of-hubbiness-squared
- DataSet<DoubleValue> hubbinessSumSquared = hubbiness
- .map(new Square<K>())
- .setParallelism(parallelism)
- .name("Square")
- .reduce(new Sum())
- .setCombineHint(CombineHint.HASH)
- .setParallelism(parallelism)
- .name("Sum");
-
- // ID, new authority
- DataSet<Tuple2<K, DoubleValue>> authority = hubbiness
- .coGroup(edges)
- .where(0)
- .equalTo(0)
- .with(new Authority<K>())
- .setParallelism(parallelism)
- .name("Authority")
- .groupBy(0)
- .reduce(new SumScore<K>())
- .setCombineHint(CombineHint.HASH)
- .setParallelism(parallelism)
- .name("Sum");
-
- // sum-of-authority-squared
- DataSet<DoubleValue> authoritySumSquared = authority
- .map(new Square<K>())
- .setParallelism(parallelism)
- .name("Square")
- .reduce(new Sum())
- .setCombineHint(CombineHint.HASH)
- .setParallelism(parallelism)
- .name("Sum");
-
- // ID, normalized hubbiness, normalized authority
- DataSet<Tuple3<K, DoubleValue, DoubleValue>> scores = hubbiness
- .fullOuterJoin(authority, JoinHint.REPARTITION_SORT_MERGE)
- .where(0)
- .equalTo(0)
- .with(new JoinAndNormalizeHubAndAuthority<K>())
- .withBroadcastSet(hubbinessSumSquared, HUBBINESS_SUM_SQUARED)
- .withBroadcastSet(authoritySumSquared, AUTHORITY_SUM_SQUARED)
- .setParallelism(parallelism)
- .name("Join scores");
-
- DataSet<Tuple3<K, DoubleValue, DoubleValue>> passThrough;
-
- if (convergenceThreshold < Double.MAX_VALUE) {
- passThrough = iterative
- .fullOuterJoin(scores, JoinHint.REPARTITION_SORT_MERGE)
- .where(0)
- .equalTo(0)
- .with(new ChangeInScores<K>())
- .setParallelism(parallelism)
- .name("Change in scores");
-
- iterative.registerAggregationConvergenceCriterion(CHANGE_IN_SCORES, new DoubleSumAggregator(), new ScoreConvergence(convergenceThreshold));
- } else {
- passThrough = scores;
- }
-
- return iterative
- .closeWith(passThrough)
- .map(new TranslateResult<K>())
- .setParallelism(parallelism)
- .name("Map result");
- }
-
- /**
- * Map edges and remove the edge value.
- *
- * @param <T> ID type
- * @param <ET> edge value type
- *
- * @see Graph.ExtractEdgeIDsMapper
- */
- @ForwardedFields("0; 1")
- private static class ExtractEdgeIDs<T, ET>
- implements MapFunction<Edge<T, ET>, Tuple2<T, T>> {
- private Tuple2<T, T> output = new Tuple2<>();
-
- @Override
- public Tuple2<T, T> map(Edge<T, ET> value)
- throws Exception {
- output.f0 = value.f0;
- output.f1 = value.f1;
- return output;
- }
- }
-
- /**
- * Initialize vertices' authority scores by assigning each vertex with an
- * initial hub score of 1.0. The hub scores are initialized to zero since
- * these will be computed based on the initial authority scores.
- *
- * The initial scores are non-normalized.
- *
- * @param <T> ID type
- */
- @ForwardedFields("1->0")
- private static class InitializeScores<T>
- implements MapFunction<Tuple2<T, T>, Tuple3<T, DoubleValue, DoubleValue>> {
- private Tuple3<T, DoubleValue, DoubleValue> output = new Tuple3<>(null, new DoubleValue(0.0), new DoubleValue(1.0));
-
- @Override
- public Tuple3<T, DoubleValue, DoubleValue> map(Tuple2<T, T> value) throws Exception {
- output.f0 = value.f1;
- return output;
- }
- }
-
- /**
- * Sum vertices' hub and authority scores.
- *
- * @param <T> ID type
- */
- @ForwardedFields("0")
- private static class SumScores<T>
- implements ReduceFunction<Tuple3<T, DoubleValue, DoubleValue>> {
- @Override
- public Tuple3<T, DoubleValue, DoubleValue> reduce(Tuple3<T, DoubleValue, DoubleValue> left, Tuple3<T, DoubleValue, DoubleValue> right)
- throws Exception {
- left.f1.setValue(left.f1.getValue() + right.f1.getValue());
- left.f2.setValue(left.f2.getValue() + right.f2.getValue());
- return left;
- }
- }
-
- /**
- * The hub score is the sum of authority scores of vertices on out-edges.
- *
- * @param <T> ID type
- */
- @ForwardedFieldsFirst("2->1")
- @ForwardedFieldsSecond("0")
- private static class Hubbiness<T>
- implements CoGroupFunction<Tuple3<T, DoubleValue, DoubleValue>, Tuple2<T, T>, Tuple2<T, DoubleValue>> {
- private Tuple2<T, DoubleValue> output = new Tuple2<>();
-
- @Override
- public void coGroup(Iterable<Tuple3<T, DoubleValue, DoubleValue>> vertex, Iterable<Tuple2<T, T>> edges, Collector<Tuple2<T, DoubleValue>> out)
- throws Exception {
- output.f1 = vertex.iterator().next().f2;
-
- for (Tuple2<T, T> edge : edges) {
- output.f0 = edge.f0;
- out.collect(output);
- }
- }
- }
-
- /**
- * The authority score is the sum of hub scores of vertices on in-edges.
- *
- * @param <T> ID type
- */
- @ForwardedFieldsFirst("1")
- @ForwardedFieldsSecond("1->0")
- private static class Authority<T>
- implements CoGroupFunction<Tuple2<T, DoubleValue>, Tuple2<T, T>, Tuple2<T, DoubleValue>> {
- private Tuple2<T, DoubleValue> output = new Tuple2<>();
-
- @Override
- public void coGroup(Iterable<Tuple2<T, DoubleValue>> vertex, Iterable<Tuple2<T, T>> edges, Collector<Tuple2<T, DoubleValue>> out)
- throws Exception {
- output.f1 = vertex.iterator().next().f1;
-
- for (Tuple2<T, T> edge : edges) {
- output.f0 = edge.f1;
- out.collect(output);
- }
- }
- }
-
- /**
- * Compute the square of each score.
- *
- * @param <T> ID type
- */
- private static class Square<T>
- implements MapFunction<Tuple2<T, DoubleValue>, DoubleValue> {
- private DoubleValue output = new DoubleValue();
-
- @Override
- public DoubleValue map(Tuple2<T, DoubleValue> value)
- throws Exception {
- double val = value.f1.getValue();
- output.setValue(val * val);
-
- return output;
- }
- }
-
- /**
- * Sum over values. This specialized function is used in place of generic aggregation.
- */
- private static class Sum
- implements ReduceFunction<DoubleValue> {
- @Override
- public DoubleValue reduce(DoubleValue first, DoubleValue second)
- throws Exception {
- first.setValue(first.getValue() + second.getValue());
- return first;
- }
- }
-
- /**
- * Join and normalize the hub and authority scores.
- *
- * @param <T> ID type
- */
- @ForwardedFieldsFirst("0")
- @ForwardedFieldsSecond("0")
- private static class JoinAndNormalizeHubAndAuthority<T>
- extends RichJoinFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>, Tuple3<T, DoubleValue, DoubleValue>> {
- private Tuple3<T, DoubleValue, DoubleValue> output = new Tuple3<>(null, new DoubleValue(), new DoubleValue());
-
- private double hubbinessRootSumSquared;
-
- private double authorityRootSumSquared;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- Collection<DoubleValue> var;
- var = getRuntimeContext().getBroadcastVariable(HUBBINESS_SUM_SQUARED);
- hubbinessRootSumSquared = Math.sqrt(var.iterator().next().getValue());
-
- var = getRuntimeContext().getBroadcastVariable(AUTHORITY_SUM_SQUARED);
- authorityRootSumSquared = Math.sqrt(var.iterator().next().getValue());
- }
-
- @Override
- public Tuple3<T, DoubleValue, DoubleValue> join(Tuple2<T, DoubleValue> hubbiness, Tuple2<T, DoubleValue> authority)
- throws Exception {
- output.f0 = (authority == null) ? hubbiness.f0 : authority.f0;
- output.f1.setValue(hubbiness == null ? 0.0 : hubbiness.f1.getValue() / hubbinessRootSumSquared);
- output.f2.setValue(authority == null ? 0.0 : authority.f1.getValue() / authorityRootSumSquared);
- return output;
- }
- }
-
- /**
- * Computes the total sum of the change in hub and authority scores over
- * all vertices between iterations. A negative score is emitted after the
- * first iteration to prevent premature convergence.
- *
- * @param <T> ID type
- */
- @ForwardedFieldsFirst("0")
- @ForwardedFieldsSecond("*")
- private static class ChangeInScores<T>
- extends RichJoinFunction<Tuple3<T, DoubleValue, DoubleValue>, Tuple3<T, DoubleValue, DoubleValue>, Tuple3<T, DoubleValue, DoubleValue>> {
- private boolean isInitialSuperstep;
-
- private double changeInScores;
-
- @Override
- public void open(Configuration parameters)
- throws Exception {
- super.open(parameters);
-
- isInitialSuperstep = (getIterationRuntimeContext().getSuperstepNumber() == 1);
- changeInScores = (isInitialSuperstep) ? -1.0 : 0.0;
- }
-
- @Override
- public void close()
- throws Exception {
- super.close();
-
- DoubleSumAggregator agg = getIterationRuntimeContext().getIterationAggregator(CHANGE_IN_SCORES);
- agg.aggregate(changeInScores);
- }
-
- @Override
- public Tuple3<T, DoubleValue, DoubleValue> join(Tuple3<T, DoubleValue, DoubleValue> first, Tuple3<T, DoubleValue, DoubleValue> second)
- throws Exception {
- if (! isInitialSuperstep) {
- changeInScores += Math.abs(second.f1.getValue() - first.f1.getValue());
- changeInScores += Math.abs(second.f2.getValue() - first.f2.getValue());
- }
-
- return second;
- }
- }
-
- /**
- * Monitors the total change in hub and authority scores over all vertices.
- * The algorithm terminates when the change in scores compared against the
- * prior iteration falls to or below the given convergence threshold.
- *
- * An optimization of this implementation of HITS is to leave the initial
- * scores non-normalized; therefore, the change in scores after the first
- * superstep cannot be measured and a negative value is emitted to signal
- * that the iteration should continue.
- */
- private static class ScoreConvergence
- implements ConvergenceCriterion<DoubleValue> {
- private double convergenceThreshold;
-
- public ScoreConvergence(double convergenceThreshold) {
- this.convergenceThreshold = convergenceThreshold;
- }
-
- @Override
- public boolean isConverged(int iteration, DoubleValue value) {
- double val = value.getValue();
- return (0 <= val && val <= convergenceThreshold);
- }
- }
-
- /**
- * Map the Tuple result to the return type.
- *
- * @param <T> ID type
- */
- @ForwardedFields("0; 1; 2")
- private static class TranslateResult<T>
- implements MapFunction<Tuple3<T, DoubleValue, DoubleValue>, Result<T>> {
- private Result<T> output = new Result<>();
-
- @Override
- public Result<T> map(Tuple3<T, DoubleValue, DoubleValue> value) throws Exception {
- output.f0 = value.f0;
- output.f1 = value.f1;
- output.f2 = value.f2;
- return output;
- }
- }
-
- /**
- * Wraps the {@link Tuple3} to encapsulate results from the HITS algorithm.
- *
- * @param <T> ID type
- */
- public static class Result<T>
- extends Tuple3<T, DoubleValue, DoubleValue>
- implements PrintableResult, UnaryResult<T> {
- public static final int HASH_SEED = 0xc7e39a63;
-
- private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
-
- @Override
- public T getVertexId0() {
- return f0;
- }
-
- @Override
- public void setVertexId0(T value) {
- f0 = value;
- }
-
- /**
- * Get the hub score. Good hubs link to good authorities.
- *
- * @return the hub score
- */
- public DoubleValue getHubScore() {
- return f1;
- }
-
- /**
- * Get the authority score. Good authorities link to good hubs.
- *
- * @return the authority score
- */
- public DoubleValue getAuthorityScore() {
- return f2;
- }
-
- public String toPrintableString() {
- return "Vertex ID: " + getVertexId0()
- + ", hub score: " + getHubScore()
- + ", authority score: " + getAuthorityScore();
- }
-
- @Override
- public int hashCode() {
- return hasher.reset()
- .hash(f0.hashCode())
- .hash(f1.getValue())
- .hash(f2.getValue())
- .hash();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
deleted file mode 100644
index 747735e..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
+++ /dev/null
@@ -1,544 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library.link_analysis;
-
-import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
-import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.asm.degree.annotate.directed.EdgeSourceDegrees;
-import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
-import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
-import org.apache.flink.graph.asm.result.PrintableResult;
-import org.apache.flink.graph.asm.result.UnaryResult;
-import org.apache.flink.graph.library.link_analysis.Functions.SumScore;
-import org.apache.flink.graph.library.link_analysis.PageRank.Result;
-import org.apache.flink.graph.utils.GraphUtils;
-import org.apache.flink.graph.utils.Murmur3_32;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collection;
-import java.util.Iterator;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
-/**
- * PageRank computes a per-vertex score which is the sum of PageRank scores
- * transmitted over in-edges. Each vertex's score is divided evenly among
- * out-edges. High-scoring vertices are linked to by other high-scoring
- * vertices; this is similar to the 'authority' score in {@link HITS}.
- *
- * http://ilpubs.stanford.edu:8090/422/1/1999-66.pdf
- *
- * @param <K> graph ID type
- * @param <VV> vertex value type
- * @param <EV> edge value type
- */
-public class PageRank<K, VV, EV>
-extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
-
- private static final String VERTEX_COUNT = "vertex count";
-
- private static final String SUM_OF_SCORES = "sum of scores";
-
- private static final String CHANGE_IN_SCORES = "change in scores";
-
- // Required configuration
- private final double dampingFactor;
-
- private int maxIterations;
-
- private double convergenceThreshold;
-
- // Optional configuration
- private int parallelism = PARALLELISM_DEFAULT;
-
- /**
- * PageRank with a fixed number of iterations.
- *
- * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex
- * @param iterations fixed number of iterations
- */
- public PageRank(double dampingFactor, int iterations) {
- this(dampingFactor, iterations, Double.MAX_VALUE);
- }
-
- /**
- * PageRank with a convergence threshold. The algorithm terminates when the
- * change in score over all vertices falls to or below the given threshold value.
- *
- * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex
- * @param convergenceThreshold convergence threshold for sum of scores
- */
- public PageRank(double dampingFactor, double convergenceThreshold) {
- this(dampingFactor, Integer.MAX_VALUE, convergenceThreshold);
- }
-
- /**
- * PageRank with a convergence threshold and a maximum iteration count. The
- * algorithm terminates after either the given number of iterations or when
- * the change in score over all vertices falls to or below the given
- * threshold value.
- *
- * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex
- * @param maxIterations maximum number of iterations
- * @param convergenceThreshold convergence threshold for sum of scores
- */
- public PageRank(double dampingFactor, int maxIterations, double convergenceThreshold) {
- Preconditions.checkArgument(0 < dampingFactor && dampingFactor < 1,
- "Damping factor must be between zero and one");
- Preconditions.checkArgument(maxIterations > 0, "Number of iterations must be greater than zero");
- Preconditions.checkArgument(convergenceThreshold > 0.0, "Convergence threshold must be greater than zero");
-
- this.dampingFactor = dampingFactor;
- this.maxIterations = maxIterations;
- this.convergenceThreshold = convergenceThreshold;
- }
-
- /**
- * Override the operator parallelism.
- *
- * @param parallelism operator parallelism
- * @return this
- */
- public PageRank<K, VV, EV> setParallelism(int parallelism) {
- this.parallelism = parallelism;
-
- return this;
- }
-
- @Override
- protected String getAlgorithmName() {
- return PageRank.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
- Preconditions.checkNotNull(other);
-
- if (! PageRank.class.isAssignableFrom(other.getClass())) {
- return false;
- }
-
- PageRank rhs = (PageRank) other;
-
- // merge configurations
-
- maxIterations = Math.max(maxIterations, rhs.maxIterations);
- convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold);
- parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
- ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
- return true;
- }
-
- @Override
- public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
- throws Exception {
- // vertex degree
- DataSet<Vertex<K, Degrees>> vertexDegree = input
- .run(new VertexDegrees<K, VV, EV>()
- .setParallelism(parallelism));
-
- // vertex count
- DataSet<LongValue> vertexCount = GraphUtils.count(vertexDegree);
-
- // s, t, d(s)
- DataSet<Edge<K, LongValue>> edgeSourceDegree = input
- .run(new EdgeSourceDegrees<K, VV, EV>()
- .setParallelism(parallelism))
- .map(new ExtractSourceDegree<K, EV>())
- .setParallelism(parallelism)
- .name("Extract source degree");
-
- // vertices with zero in-edges
- DataSet<Tuple2<K, DoubleValue>> sourceVertices = vertexDegree
- .flatMap(new InitializeSourceVertices<K>())
- .withBroadcastSet(vertexCount, VERTEX_COUNT)
- .setParallelism(parallelism)
- .name("Initialize source vertex scores");
-
- // s, initial pagerank(s)
- DataSet<Tuple2<K, DoubleValue>> initialScores = vertexDegree
- .map(new InitializeVertexScores<K>())
- .withBroadcastSet(vertexCount, VERTEX_COUNT)
- .setParallelism(parallelism)
- .name("Initialize scores");
-
- IterativeDataSet<Tuple2<K, DoubleValue>> iterative = initialScores
- .iterate(maxIterations);
-
- // s, projected pagerank(s)
- DataSet<Tuple2<K, DoubleValue>> vertexScores = iterative
- .coGroup(edgeSourceDegree)
- .where(0)
- .equalTo(0)
- .with(new SendScore<K>())
- .setParallelism(parallelism)
- .name("Send score")
- .groupBy(0)
- .reduce(new SumScore<K>())
- .setCombineHint(CombineHint.HASH)
- .setParallelism(parallelism)
- .name("Sum");
-
- // ignored ID, total pagerank
- DataSet<Tuple2<K, DoubleValue>> sumOfScores = vertexScores
- .reduce(new SumVertexScores<K>())
- .setParallelism(parallelism)
- .name("Sum");
-
- // s, adjusted pagerank(s)
- DataSet<Tuple2<K, DoubleValue>> adjustedScores = vertexScores
- .union(sourceVertices)
- .setParallelism(parallelism)
- .name("Union with source vertices")
- .map(new AdjustScores<K>(dampingFactor))
- .withBroadcastSet(sumOfScores, SUM_OF_SCORES)
- .withBroadcastSet(vertexCount, VERTEX_COUNT)
- .setParallelism(parallelism)
- .name("Adjust scores");
-
- DataSet<Tuple2<K, DoubleValue>> passThrough;
-
- if (convergenceThreshold < Double.MAX_VALUE) {
- passThrough = iterative
- .join(adjustedScores)
- .where(0)
- .equalTo(0)
- .with(new ChangeInScores<K>())
- .setParallelism(parallelism)
- .name("Change in scores");
-
- iterative.registerAggregationConvergenceCriterion(CHANGE_IN_SCORES, new DoubleSumAggregator(), new ScoreConvergence(convergenceThreshold));
- } else {
- passThrough = adjustedScores;
- }
-
- return iterative
- .closeWith(passThrough)
- .map(new TranslateResult<K>())
- .setParallelism(parallelism)
- .name("Map result");
- }
-
- /**
- * Remove the unused original edge value and extract the out-degree.
- *
- * @param <T> ID type
- * @param <ET> edge value type
- */
- @ForwardedFields("0; 1")
- private static class ExtractSourceDegree<T, ET>
- implements MapFunction<Edge<T, Tuple2<ET, Degrees>>, Edge<T, LongValue>> {
- Edge<T, LongValue> output = new Edge<>();
-
- @Override
- public Edge<T, LongValue> map(Edge<T, Tuple2<ET, Degrees>> edge)
- throws Exception {
- output.f0 = edge.f0;
- output.f1 = edge.f1;
- output.f2 = edge.f2.f1.getOutDegree();
- return output;
- }
- }
-
- /**
- * Source vertices have no in-edges so have a projected score of 0.0.
- *
- * @param <T> ID type
- */
- @ForwardedFields("0")
- private static class InitializeSourceVertices<T>
- implements FlatMapFunction<Vertex<T, Degrees>, Tuple2<T, DoubleValue>> {
- private Tuple2<T, DoubleValue> output = new Tuple2<>(null, new DoubleValue(0.0));
-
- @Override
- public void flatMap(Vertex<T, Degrees> vertex, Collector<Tuple2<T, DoubleValue>> out)
- throws Exception {
- if (vertex.f1.getInDegree().getValue() == 0) {
- output.f0 = vertex.f0;
- out.collect(output);
- }
- }
- }
-
- /**
- * PageRank scores sum to 1.0 so initialize each vertex with the inverse of
- * the number of vertices.
- *
- * @param <T> ID type
- */
- @ForwardedFields("0")
- private static class InitializeVertexScores<T>
- extends RichMapFunction<Vertex<T, Degrees>, Tuple2<T, DoubleValue>> {
- private Tuple2<T, DoubleValue> output = new Tuple2<>();
-
- @Override
- public void open(Configuration parameters)
- throws Exception {
- super.open(parameters);
-
- Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT);
- output.f1 = new DoubleValue(1.0 / vertexCount.iterator().next().getValue());
- }
-
- @Override
- public Tuple2<T, DoubleValue> map(Vertex<T, Degrees> vertex)
- throws Exception {
- output.f0 = vertex.f0;
- return output;
- }
- }
-
- /**
- * The PageRank score for each vertex is divided evenly and projected to
- * neighbors on out-edges.
- *
- * @param <T> ID type
- */
- @ForwardedFieldsSecond("1->0")
- private static class SendScore<T>
- implements CoGroupFunction<Tuple2<T, DoubleValue>, Edge<T, LongValue>, Tuple2<T, DoubleValue>> {
- private Tuple2<T, DoubleValue> output = new Tuple2<>(null, new DoubleValue());
-
- @Override
- public void coGroup(Iterable<Tuple2<T, DoubleValue>> vertex, Iterable<Edge<T, LongValue>> edges, Collector<Tuple2<T, DoubleValue>> out)
- throws Exception {
- Iterator<Edge<T, LongValue>> edgeIterator = edges.iterator();
-
- if (edgeIterator.hasNext()) {
- Edge<T, LongValue> edge = edgeIterator.next();
-
- output.f0 = edge.f1;
- output.f1.setValue(vertex.iterator().next().f1.getValue() / edge.f2.getValue());
- out.collect(output);
-
- while (edgeIterator.hasNext()) {
- edge = edgeIterator.next();
- output.f0 = edge.f1;
- out.collect(output);
- }
- }
- }
- }
-
- /**
- * Sum the PageRank score over all vertices. The vertex ID must be ignored
- * but is retained rather than adding another operator.
- *
- * @param <T> ID type
- */
- @ForwardedFields("0")
- private static class SumVertexScores<T>
- implements ReduceFunction<Tuple2<T, DoubleValue>> {
- @Override
- public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> first, Tuple2<T, DoubleValue> second)
- throws Exception {
- first.f1.setValue(first.f1.getValue() + second.f1.getValue());
- return first;
- }
- }
-
- /**
- * Each iteration the per-vertex scores are adjusted with the damping
- * factor. Each score is multiplied by the damping factor then added to the
- * probability of a "random hop", which is one minus the damping factor.
- *
- * This operation also accounts for 'sink' vertices, which have no
- * out-edges to project score to. The sink scores are computed by taking
- * one minus the sum of vertex scores, which also includes precision error.
- * This 'missing' score is evenly distributed across vertices as with the
- * random hop.
- *
- * @param <T> ID type
- */
- @ForwardedFields("0")
- private static class AdjustScores<T>
- extends RichMapFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>> {
- private double dampingFactor;
-
- private long vertexCount;
-
- private double uniformlyDistributedScore;
-
- public AdjustScores(double dampingFactor) {
- this.dampingFactor = dampingFactor;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- Collection<Tuple2<T, DoubleValue>> sumOfScores = getRuntimeContext().getBroadcastVariable(SUM_OF_SCORES);
- // floating point precision error is also included in sumOfSinks
- double sumOfSinks = 1 - sumOfScores.iterator().next().f1.getValue();
-
- Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT);
- this.vertexCount = vertexCount.iterator().next().getValue();
-
- this.uniformlyDistributedScore = ((1 - dampingFactor) + dampingFactor * sumOfSinks) / this.vertexCount;
- }
-
- @Override
- public Tuple2<T, DoubleValue> map(Tuple2<T, DoubleValue> value) throws Exception {
- value.f1.setValue(uniformlyDistributedScore + (dampingFactor * value.f1.getValue()));
- return value;
- }
- }
-
- /**
- * Computes the sum of the absolute change in vertex PageRank scores
- * between iterations.
- *
- * @param <T> ID type
- */
- @ForwardedFieldsFirst("0")
- @ForwardedFieldsSecond("*")
- private static class ChangeInScores<T>
- extends RichJoinFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>> {
- private double changeInScores;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- changeInScores = 0.0;
- }
-
- @Override
- public void close()
- throws Exception {
- super.close();
-
- DoubleSumAggregator agg = getIterationRuntimeContext().getIterationAggregator(CHANGE_IN_SCORES);
- agg.aggregate(changeInScores);
- }
-
- @Override
- public Tuple2<T, DoubleValue> join(Tuple2<T, DoubleValue> first, Tuple2<T, DoubleValue> second)
- throws Exception {
- changeInScores += Math.abs(second.f1.getValue() - first.f1.getValue());
- return second;
- }
- }
-
- /**
- * Monitors the sum of the absolute change in vertex scores. The algorithm
- * terminates when the change in scores compared against the prior iteration
- * falls to or below the given convergence threshold.
- */
- private static class ScoreConvergence
- implements ConvergenceCriterion<DoubleValue> {
- private double convergenceThreshold;
-
- public ScoreConvergence(double convergenceThreshold) {
- this.convergenceThreshold = convergenceThreshold;
- }
-
- @Override
- public boolean isConverged(int iteration, DoubleValue value) {
- double val = value.getValue();
- return (val <= convergenceThreshold);
- }
- }
-
- /**
- * Map the Tuple result to the return type.
- *
- * @param <T> ID type
- */
- @ForwardedFields("0; 1")
- private static class TranslateResult<T>
- implements MapFunction<Tuple2<T, DoubleValue>, Result<T>> {
- private Result<T> output = new Result<>();
-
- @Override
- public Result<T> map(Tuple2<T, DoubleValue> value) throws Exception {
- output.f0 = value.f0;
- output.f1 = value.f1;
- return output;
- }
- }
-
- /**
- * Wraps the {@link Tuple2} to encapsulate results from the PageRank algorithm.
- *
- * @param <T> ID type
- */
- public static class Result<T>
- extends Tuple2<T, DoubleValue>
- implements PrintableResult, UnaryResult<T> {
- public static final int HASH_SEED = 0x4010af29;
-
- private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
-
- @Override
- public T getVertexId0() {
- return f0;
- }
-
- @Override
- public void setVertexId0(T value) {
- f0 = value;
- }
-
- /**
- * Get the PageRank score.
- *
- * @return the PageRank score
- */
- public DoubleValue getPageRankScore() {
- return f1;
- }
-
- @Override
- public String toPrintableString() {
- return "Vertex ID: " + getVertexId0()
- + ", PageRank score: " + getPageRankScore();
- }
-
- @Override
- public int hashCode() {
- return hasher.reset()
- .hash(f0.hashCode())
- .hash(f1.getValue())
- .hash();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/Functions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/Functions.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/Functions.java
new file mode 100644
index 0000000..0fdd46a
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/Functions.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.linkanalysis;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.DoubleValue;
+
+class Functions {
+
+ private Functions() {}
+
+ /**
+ * Sum vertices' scores.
+ *
+ * @param <T> ID type
+ */
+ @ForwardedFields("0")
+ protected static final class SumScore<T>
+ implements ReduceFunction<Tuple2<T, DoubleValue>> {
+ @Override
+ public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> left, Tuple2<T, DoubleValue> right)
+ throws Exception {
+ left.f1.setValue(left.f1.getValue() + right.f1.getValue());
+ return left;
+ }
+ }
+}