You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/09/08 21:12:45 UTC
[1/3] flink git commit: [FLINK-4522] [docs] Gelly link broken in
homepage
Repository: flink
Updated Branches:
refs/heads/master 0735b5b93 -> bdd3c0d94
[FLINK-4522] [docs] Gelly link broken in homepage
The Gelly documentation was recently split into multiple pages in
FLINK-4104 but was missing a redirect. This commit updates the Gelly
redirect to point to the old page.
This closes #2464
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95ad865b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95ad865b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95ad865b
Branch: refs/heads/master
Commit: 95ad865bd9d7121c80ecd4d1ed92e2912052a502
Parents: 0735b5b
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Sep 2 10:42:30 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Sep 8 17:06:19 2016 -0400
----------------------------------------------------------------------
docs/redirects/gelly.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/95ad865b/docs/redirects/gelly.md
----------------------------------------------------------------------
diff --git a/docs/redirects/gelly.md b/docs/redirects/gelly.md
index 8d7d850..c61107b 100644
--- a/docs/redirects/gelly.md
+++ b/docs/redirects/gelly.md
@@ -2,7 +2,7 @@
title: "Gelly"
layout: redirect
redirect: /dev/libs/gelly/index.html
-permalink: /apis/batch/libs/gelly/index.html
+permalink: /apis/batch/libs/gelly.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
[2/3] flink git commit: [FLINK-4257] [gelly] Handle delegating
algorithm change of class
Posted by gr...@apache.org.
[FLINK-4257] [gelly] Handle delegating algorithm change of class
Replaces Delegate with NoOpOperator.
This closes #2474
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8210ff46
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8210ff46
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8210ff46
Branch: refs/heads/master
Commit: 8210ff468d64fc50520011fc6fed9909d2a6b89a
Parents: 95ad865
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Jul 25 09:09:27 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Sep 8 17:06:29 2016 -0400
----------------------------------------------------------------------
.../annotate/directed/EdgeDegreesPair.java | 6 +-
.../annotate/directed/EdgeSourceDegrees.java | 6 +-
.../annotate/directed/EdgeTargetDegrees.java | 6 +-
.../degree/annotate/directed/VertexDegrees.java | 6 +-
.../annotate/directed/VertexInDegree.java | 6 +-
.../annotate/directed/VertexOutDegree.java | 6 +-
.../annotate/undirected/EdgeDegreePair.java | 6 +-
.../annotate/undirected/EdgeSourceDegree.java | 6 +-
.../annotate/undirected/EdgeTargetDegree.java | 6 +-
.../annotate/undirected/VertexDegree.java | 6 +-
.../degree/filter/undirected/MaximumDegree.java | 6 +-
.../graph/asm/simple/directed/Simplify.java | 6 +-
.../graph/asm/simple/undirected/Simplify.java | 6 +-
.../asm/translate/TranslateEdgeValues.java | 6 +-
.../graph/asm/translate/TranslateGraphIds.java | 6 +-
.../asm/translate/TranslateVertexValues.java | 6 +-
.../directed/LocalClusteringCoefficient.java | 6 +-
.../clustering/directed/TriangleListing.java | 6 +-
.../undirected/LocalClusteringCoefficient.java | 6 +-
.../clustering/undirected/TriangleListing.java | 6 +-
.../flink/graph/library/link_analysis/HITS.java | 6 +-
.../graph/library/similarity/AdamicAdar.java | 6 +-
.../graph/library/similarity/JaccardIndex.java | 6 +-
.../flink/graph/utils/proxy/Delegate.java | 112 -------------
.../proxy/GraphAlgorithmDelegatingDataSet.java | 150 -----------------
.../proxy/GraphAlgorithmDelegatingGraph.java | 160 ------------------
.../proxy/GraphAlgorithmWrappingDataSet.java | 151 +++++++++++++++++
.../proxy/GraphAlgorithmWrappingGraph.java | 161 +++++++++++++++++++
28 files changed, 381 insertions(+), 491 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
index be19613..408516b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
@@ -27,7 +27,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -41,7 +41,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class EdgeDegreesPair<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, Degrees>>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, Degrees>>> {
// Optional configuration
private int parallelism = PARALLELISM_DEFAULT;
@@ -64,7 +64,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, D
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
if (! EdgeDegreesPair.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
index ee3175e..e55e3c6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
@@ -26,7 +26,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -40,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class EdgeSourceDegrees<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
// Optional configuration
private int parallelism = PARALLELISM_DEFAULT;
@@ -63,7 +63,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>>
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
if (! EdgeSourceDegrees.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
index 6ba47f2..ed48f98 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
@@ -26,7 +26,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -40,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class EdgeTargetDegrees<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
// Optional configuration
private int parallelism = PARALLELISM_DEFAULT;
@@ -63,7 +63,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>>
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
if (! EdgeTargetDegrees.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index 84873bc..f4d734e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -33,7 +33,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.utils.Murmur3_32;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.ByteValue;
import org.apache.flink.types.LongValue;
@@ -50,7 +50,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class VertexDegrees<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, Degrees>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> {
// Optional configuration
private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true);
@@ -90,7 +90,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, Degrees>> {
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
if (! VertexDegrees.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index f7ac18b..3f842a6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -25,7 +25,7 @@ import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToTargetId;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Preconditions;
@@ -40,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class VertexInDegree<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
// Optional configuration
private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true);
@@ -83,7 +83,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
if (! VertexInDegree.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index e235f6a..0ec4fc1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -25,7 +25,7 @@ import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Preconditions;
@@ -40,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class VertexOutDegree<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
// Optional configuration
private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true);
@@ -83,7 +83,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
if (! VertexOutDegree.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
index 1f78566..09ef975 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
@@ -26,7 +26,7 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Preconditions;
@@ -42,7 +42,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class EdgeDegreePair<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, LongValue>>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, LongValue>>> {
// Optional configuration
private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false);
@@ -85,7 +85,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue,
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
index 520723c..702fead 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
@@ -25,7 +25,7 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Preconditions;
@@ -40,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class EdgeSourceDegree<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> {
// Optional configuration
private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false);
@@ -83,7 +83,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
index 123c1dc..724567e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
@@ -25,7 +25,7 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Preconditions;
@@ -40,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class EdgeTargetDegree<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> {
// Optional configuration
private OptionalBoolean reduceOnSourceId = new OptionalBoolean(false, false);
@@ -83,7 +83,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index 42f084d..0f753fc 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -21,7 +21,7 @@ package org.apache.flink.graph.asm.degree.annotate.undirected;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
@@ -43,7 +43,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class VertexDegree<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
// Optional configuration
private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true);
@@ -103,7 +103,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
if (! VertexDegree.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
index f9cfae9..be19ffd 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
@@ -29,7 +29,7 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
@@ -47,7 +47,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class MaximumDegree<K, VV, EV>
-extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
+extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
// Required configuration
private long maximumDegree;
@@ -120,7 +120,7 @@ extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
Preconditions.checkNotNull(other);
if (! MaximumDegree.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
index 983dac9..99ffe0d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.util.Preconditions;
@@ -36,7 +36,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
+extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
// Optional configuration
private int parallelism = PARALLELISM_DEFAULT;
@@ -62,7 +62,7 @@ extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
Preconditions.checkNotNull(other);
if (! Simplify.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
index ce78cfa..45cd3f9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
@@ -38,7 +38,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
+extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
// Required configuration
private boolean clipAndFlip;
@@ -80,7 +80,7 @@ extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
Preconditions.checkNotNull(other);
if (! Simplify.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
index 6003c9a..bde826e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
@@ -21,7 +21,7 @@ package org.apache.flink.graph.asm.translate;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -36,7 +36,7 @@ import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues
* @param <NEW> new edge value type
*/
public class TranslateEdgeValues<K, VV, OLD, NEW>
-extends GraphAlgorithmDelegatingGraph<K, VV, OLD, K, VV, NEW> {
+extends GraphAlgorithmWrappingGraph<K, VV, OLD, K, VV, NEW> {
// Required configuration
private TranslateFunction<OLD,NEW> translator;
@@ -76,7 +76,7 @@ extends GraphAlgorithmDelegatingGraph<K, VV, OLD, K, VV, NEW> {
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
Preconditions.checkNotNull(other);
if (! TranslateEdgeValues.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
index 6ea56eb..2c67c5a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -38,7 +38,7 @@ import static org.apache.flink.graph.asm.translate.Translate.translateVertexIds;
* @param <EV> edge value type
*/
public class TranslateGraphIds<OLD, NEW, VV, EV>
-extends GraphAlgorithmDelegatingGraph<OLD, VV, EV, NEW, VV, EV> {
+extends GraphAlgorithmWrappingGraph<OLD, VV, EV, NEW, VV, EV> {
// Required configuration
private TranslateFunction<OLD,NEW> translator;
@@ -78,7 +78,7 @@ extends GraphAlgorithmDelegatingGraph<OLD, VV, EV, NEW, VV, EV> {
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
Preconditions.checkNotNull(other);
if (! TranslateGraphIds.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
index 3a49324..9e6784e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
@@ -21,7 +21,7 @@ package org.apache.flink.graph.asm.translate;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -36,7 +36,7 @@ import static org.apache.flink.graph.asm.translate.Translate.translateVertexValu
* @param <EV> edge value type
*/
public class TranslateVertexValues<K, OLD, NEW, EV>
-extends GraphAlgorithmDelegatingGraph<K, OLD, EV, K, NEW, EV> {
+extends GraphAlgorithmWrappingGraph<K, OLD, EV, K, NEW, EV> {
// Required configuration
private TranslateFunction<OLD, NEW> translator;
@@ -76,7 +76,7 @@ extends GraphAlgorithmDelegatingGraph<K, OLD, EV, K, NEW, EV> {
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
Preconditions.checkNotNull(other);
if (! TranslateVertexValues.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index 22c8b41..9d323a8 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -31,8 +31,8 @@ import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result;
import org.apache.flink.graph.utils.Murmur3_32;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
@@ -57,7 +57,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class LocalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
// Optional configuration
private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(true, true);
@@ -99,7 +99,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
if (! LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index 14c731a..7df288a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -36,7 +36,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.ByteValue;
import org.apache.flink.types.CopyableValue;
@@ -62,7 +62,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
// Optional configuration
private OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, false);
@@ -103,7 +103,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
if (! TriangleListing.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 4b4bf07..293e3f9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -31,8 +31,8 @@ import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
import org.apache.flink.graph.utils.Murmur3_32;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
@@ -57,7 +57,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class LocalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
// Optional configuration
private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(true, true);
@@ -100,7 +100,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
if (! LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index 89b86fe..c3dbf3e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -33,7 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.types.LongValue;
@@ -63,7 +63,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Tuple3<K, K, K>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, K, K>> {
// Optional configuration
private OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, false);
@@ -104,7 +104,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Tuple3<K, K, K>> {
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
if (! TriangleListing.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
index 60e99bd..7ba6fee 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
@@ -40,7 +40,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.library.link_analysis.HITS.Result;
import org.apache.flink.graph.utils.Murmur3_32;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.types.DoubleValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
@@ -64,7 +64,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class HITS<K, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
private static final String CHANGE_IN_SCORES = "change in scores";
@@ -135,7 +135,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
if (! HITS.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index 512a7a0..1514866 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -37,7 +37,7 @@ import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
import org.apache.flink.graph.utils.Murmur3_32;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.types.FloatValue;
import org.apache.flink.types.IntValue;
@@ -71,7 +71,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class AdamicAdar<K extends CopyableValue<K>, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
private static final int GROUP_SIZE = 64;
@@ -133,7 +133,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
if (! AdamicAdar.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index 7783e6b..1e406fa 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -31,7 +31,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
import org.apache.flink.graph.utils.Murmur3_32;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
@@ -61,7 +61,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class JaccardIndex<K extends CopyableValue<K>, VV, EV>
-extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
public static final int DEFAULT_GROUP_SIZE = 64;
@@ -159,7 +159,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
}
@Override
- protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
Preconditions.checkNotNull(other);
if (! JaccardIndex.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java
deleted file mode 100644
index a2d724d..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils.proxy;
-
-import javassist.util.proxy.MethodFilter;
-import javassist.util.proxy.MethodHandler;
-import javassist.util.proxy.ProxyFactory;
-import javassist.util.proxy.ProxyObject;
-import org.objenesis.ObjenesisStd;
-
-import java.lang.reflect.Method;
-
-/**
- * Wraps an object with a proxy delegate whose method handler invokes all
- * method calls on the wrapped object. This object can be later replaced.
- *
- * @param <X> the type of the proxied object
- */
-public class Delegate<X> {
- private X obj;
-
- private X proxy = null;
-
- /**
- * Set the initial delegated object.
- *
- * @param obj delegated object
- */
- public Delegate(X obj) {
- setObject(obj);
- }
-
- /**
- * Change the delegated object.
- *
- * @param obj delegated object
- */
- public void setObject(X obj) {
- this.obj = (obj instanceof ReferentProxy) ? ((ReferentProxy<X>) obj).getProxiedObject() : obj;
- }
-
- /**
- * Instantiates and returns a proxy object which subclasses the
- * delegated object. The proxy's method handler invokes all methods
- * on the delegated object that is set at the time of invocation.
- *
- * @return delegating proxy
- */
- @SuppressWarnings("unchecked")
- public X getProxy() {
- if (proxy != null) {
- return proxy;
- }
-
- ProxyFactory factory = new ProxyFactory();
- factory.setSuperclass(obj.getClass());
- factory.setInterfaces(new Class[]{ReferentProxy.class});
-
- // create the class and instantiate an instance without calling a constructor
- Class<? extends X> proxyClass = factory.createClass(new MethodFilter() {
- @Override
- public boolean isHandled(Method method) {
- return true;
- }
- });
- proxy = new ObjenesisStd().newInstance(proxyClass);
-
- // create and set a handler to invoke all method calls on the delegated object
- ((ProxyObject) proxy).setHandler(new MethodHandler() {
- @Override
- public Object invoke(Object self, Method thisMethod, Method proceed, Object[] args) throws Throwable {
- if (thisMethod.getName().equals("getProxiedObject")) {
- // this method is provided by the ReferentProxy interface
- return obj;
- } else {
- // method visibility may be restricted
- thisMethod.setAccessible(true);
- return thisMethod.invoke(obj, args);
- }
- }
- });
-
- return proxy;
- }
-
- /**
- * This interface provides access via the proxy handler to the original
- * object being proxied. This is necessary since we cannot and should not
- * create a proxy of a proxy but must instead proxy the original object.
- *
- * @param <Y> the type of the proxied object
- */
- protected interface ReferentProxy<Y> {
- Y getProxiedObject();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java
deleted file mode 100644
index 8e796e6..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils.proxy;
-
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of
- * type {@code T}. A {@code GraphAlgorithmDelegatingDataSet} wraps the resultant
- * {@link DataSet} with a delegating proxy object. The delegated object can be
- * replaced when the same algorithm is run on the same input with a mergeable
- * configuration. This allows algorithms to be composed of implicitly reusable
- * algorithms without publicly sharing intermediate {@link DataSet}s.
- *
- * @param <K> ID type
- * @param <VV> vertex value type
- * @param <EV> edge value type
- * @param <T> output type
- */
-public abstract class GraphAlgorithmDelegatingDataSet<K, VV, EV, T>
-implements GraphAlgorithm<K, VV, EV, DataSet<T>> {
-
- // each algorithm and input pair may map to multiple configurations
- private static Map<GraphAlgorithmDelegatingDataSet, List<GraphAlgorithmDelegatingDataSet>> cache =
- Collections.synchronizedMap(new HashMap<GraphAlgorithmDelegatingDataSet, List<GraphAlgorithmDelegatingDataSet>>());
-
- private Graph<K,VV,EV> input;
-
- private Delegate<DataSet<T>> delegate;
-
- /**
- * Algorithms are identified by name rather than by class to allow subclassing.
- *
- * @return name of the algorithm, which may be shared by multiple classes
- * implementing the same algorithm and generating the same output
- */
- protected abstract String getAlgorithmName();
-
- /**
- * An algorithm must first test whether the configurations can be merged
- * before merging individual fields.
- *
- * @param other the algorithm with which to compare and merge
- * @return true if and only if configuration has been merged and the
- * algorithm's output can be reused
- */
- protected abstract boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other);
-
- /**
- * The implementation of the algorithm, renamed from {@link GraphAlgorithm#run(Graph)}.
- *
- * @param input the input graph
- * @return the algorithm's output
- * @throws Exception
- */
- protected abstract DataSet<T> runInternal(Graph<K, VV, EV> input) throws Exception;
-
- @Override
- public final int hashCode() {
- return new HashCodeBuilder(17, 37)
- .append(input)
- .append(getAlgorithmName())
- .toHashCode();
- }
-
- @Override
- public final boolean equals(Object obj) {
- if (obj == null) {
- return false;
- }
-
- if (obj == this) {
- return true;
- }
-
- if (! GraphAlgorithmDelegatingDataSet.class.isAssignableFrom(obj.getClass())) {
- return false;
- }
-
- GraphAlgorithmDelegatingDataSet rhs = (GraphAlgorithmDelegatingDataSet) obj;
-
- return new EqualsBuilder()
- .append(input, rhs.input)
- .append(getAlgorithmName(), rhs.getAlgorithmName())
- .isEquals();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public final DataSet<T> run(Graph<K, VV, EV> input)
- throws Exception {
- this.input = input;
-
- if (cache.containsKey(this)) {
- for (GraphAlgorithmDelegatingDataSet<K, VV, EV, T> other : cache.get(this)) {
- if (mergeConfiguration(other)) {
- // configuration has been merged so generate new output
- DataSet<T> output = runInternal(input);
-
- // update delegatee object and reuse delegate
- other.delegate.setObject(output);
- delegate = other.delegate;
-
- return delegate.getProxy();
- }
- }
- }
-
- // no mergeable configuration found so generate new output
- DataSet<T> output = runInternal(input);
-
- // create a new delegate to wrap the algorithm output
- delegate = new Delegate<>(output);
-
- // cache this result
- if (cache.containsKey(this)) {
- cache.get(this).add(this);
- } else {
- cache.put(this, new ArrayList(Collections.singletonList(this)));
- }
-
- return delegate.getProxy();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java
deleted file mode 100644
index 705510a..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils.proxy;
-
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of
- * type {@code T}. A {@code GraphAlgorithmDelegatingGraph} wraps the resultant
- * {@link Graph} with a delegating proxy object. The delegated object can be
- * replaced when the same algorithm is run on the same input with a mergeable
- * configuration. This allows algorithms to be composed of implicitly reusable
- * algorithms without publicly sharing intermediate {@link DataSet}s.
- *
- * @param <IN_K> input ID type
- * @param <IN_VV> input vertex value type
- * @param <IN_EV> input edge value type
- * @param <OUT_K> output ID type
- * @param <OUT_VV> output vertex value type
- * @param <OUT_EV> output edge value type
- */
-public abstract class GraphAlgorithmDelegatingGraph<IN_K, IN_VV, IN_EV, OUT_K, OUT_VV, OUT_EV>
-implements GraphAlgorithm<IN_K, IN_VV, IN_EV, Graph<OUT_K, OUT_VV, OUT_EV>> {
-
- // each algorithm and input pair may map to multiple configurations
- private static Map<GraphAlgorithmDelegatingGraph, List<GraphAlgorithmDelegatingGraph>> cache =
- Collections.synchronizedMap(new HashMap<GraphAlgorithmDelegatingGraph, List<GraphAlgorithmDelegatingGraph>>());
-
- private Graph<IN_K, IN_VV, IN_EV> input;
-
- private Delegate<DataSet<Vertex<OUT_K, OUT_VV>>> verticesDelegate;
-
- private Delegate<DataSet<Edge<OUT_K, OUT_EV>>> edgesDelegate;
-
- /**
- * Algorithms are identified by name rather than by class to allow subclassing.
- *
- * @return name of the algorithm, which may be shared by multiple classes
- * implementing the same algorithm and generating the same output
- */
- protected abstract String getAlgorithmName();
-
- /**
- * An algorithm must first test whether the configurations can be merged
- * before merging individual fields.
- *
- * @param other the algorithm with which to compare and merge
- * @return true if and only if configuration has been merged and the
- * algorithm's output can be reused
- */
- protected abstract boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other);
-
- /**
- * The implementation of the algorithm, renamed from {@link GraphAlgorithm#run(Graph)}.
- *
- * @param input the input graph
- * @return the algorithm's output
- * @throws Exception
- */
- protected abstract Graph<OUT_K, OUT_VV, OUT_EV> runInternal(Graph<IN_K, IN_VV, IN_EV> input) throws Exception;
-
- @Override
- public final int hashCode() {
- return new HashCodeBuilder(17, 37)
- .append(input)
- .append(getAlgorithmName())
- .toHashCode();
- }
-
- @Override
- public final boolean equals(Object obj) {
- if (obj == null) {
- return false;
- }
-
- if (obj == this) {
- return true;
- }
-
- if (! GraphAlgorithmDelegatingGraph.class.isAssignableFrom(obj.getClass())) {
- return false;
- }
-
- GraphAlgorithmDelegatingGraph rhs = (GraphAlgorithmDelegatingGraph) obj;
-
- return new EqualsBuilder()
- .append(input, rhs.input)
- .append(getAlgorithmName(), rhs.getAlgorithmName())
- .isEquals();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public final Graph<OUT_K, OUT_VV, OUT_EV> run(Graph<IN_K, IN_VV, IN_EV> input)
- throws Exception {
- this.input = input;
-
- if (cache.containsKey(this)) {
- for (GraphAlgorithmDelegatingGraph<IN_K, IN_VV, IN_EV, OUT_K, OUT_VV, OUT_EV> other : cache.get(this)) {
- if (mergeConfiguration(other)) {
- // configuration has been merged so generate new output
- Graph<OUT_K, OUT_VV, OUT_EV> output = runInternal(input);
-
- // update delegatee object and reuse delegate
- other.verticesDelegate.setObject(output.getVertices());
- verticesDelegate = other.verticesDelegate;
-
- other.edgesDelegate.setObject(output.getEdges());
- edgesDelegate = other.edgesDelegate;
-
- return Graph.fromDataSet(verticesDelegate.getProxy(), edgesDelegate.getProxy(), output.getContext());
- }
- }
- }
-
- // no mergeable configuration found so generate new output
- Graph<OUT_K, OUT_VV, OUT_EV> output = runInternal(input);
-
- // create a new delegate to wrap the algorithm output
- verticesDelegate = new Delegate<>(output.getVertices());
- edgesDelegate = new Delegate<>(output.getEdges());
-
- // cache this result
- if (cache.containsKey(this)) {
- cache.get(this).add(this);
- } else {
- cache.put(this, new ArrayList(Collections.singletonList(this)));
- }
-
- return Graph.fromDataSet(verticesDelegate.getProxy(), edgesDelegate.getProxy(), output.getContext());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
new file mode 100644
index 0000000..7a4a0e6
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils.proxy;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.NoOpOperator;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of
+ * type {@code T}. A {@code GraphAlgorithmWrappingDataSet} wraps the resultant
+ * {@link DataSet} with a {@code NoOpOperator}. The input to the wrapped
+ * operator can be replaced when the same algorithm is run on the same input
+ * with a mergeable configuration. This allows algorithms to be composed of
+ * implicitly reusable algorithms without publicly sharing intermediate
+ * {@link DataSet}s.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ * @param <T> output type
+ */
+public abstract class GraphAlgorithmWrappingDataSet<K, VV, EV, T>
+implements GraphAlgorithm<K, VV, EV, DataSet<T>> {
+
+ // each algorithm and input pair may map to multiple configurations
+ private static Map<GraphAlgorithmWrappingDataSet, List<GraphAlgorithmWrappingDataSet>> cache =
+ Collections.synchronizedMap(new HashMap<GraphAlgorithmWrappingDataSet, List<GraphAlgorithmWrappingDataSet>>());
+
+ private Graph<K,VV,EV> input;
+
+ private NoOpOperator<T> wrappingOperator;
+
+ /**
+ * Algorithms are identified by name rather than by class to allow subclassing.
+ *
+ * @return name of the algorithm, which may be shared by multiple classes
+ * implementing the same algorithm and generating the same output
+ */
+ protected abstract String getAlgorithmName();
+
+ /**
+ * An algorithm must first test whether the configurations can be merged
+ * before merging individual fields.
+ *
+ * @param other the algorithm with which to compare and merge
+ * @return true if and only if configuration has been merged and the
+ * algorithm's output can be reused
+ */
+ protected abstract boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other);
+
+ /**
+ * The implementation of the algorithm, renamed from {@link GraphAlgorithm#run(Graph)}.
+ *
+ * @param input the input graph
+ * @return the algorithm's output
+ * @throws Exception
+ */
+ protected abstract DataSet<T> runInternal(Graph<K, VV, EV> input) throws Exception;
+
+ @Override
+ public final int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(input)
+ .append(getAlgorithmName())
+ .toHashCode();
+ }
+
+ @Override
+ public final boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ if (! GraphAlgorithmWrappingDataSet.class.isAssignableFrom(obj.getClass())) {
+ return false;
+ }
+
+ GraphAlgorithmWrappingDataSet rhs = (GraphAlgorithmWrappingDataSet) obj;
+
+ return new EqualsBuilder()
+ .append(input, rhs.input)
+ .append(getAlgorithmName(), rhs.getAlgorithmName())
+ .isEquals();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public final DataSet<T> run(Graph<K, VV, EV> input)
+ throws Exception {
+ this.input = input;
+
+ if (cache.containsKey(this)) {
+ for (GraphAlgorithmWrappingDataSet<K, VV, EV, T> other : cache.get(this)) {
+ if (mergeConfiguration(other)) {
+ // configuration has been merged so generate new output
+ DataSet<T> output = runInternal(input);
+
+ other.wrappingOperator.setInput(output);
+ wrappingOperator = other.wrappingOperator;
+
+ return wrappingOperator;
+ }
+ }
+ }
+
+ // no mergeable configuration found so generate new output
+ DataSet<T> output = runInternal(input);
+
+ // create a new operator to wrap the algorithm output
+ wrappingOperator = new NoOpOperator<>(output, output.getType());
+
+ // cache this result
+ if (cache.containsKey(this)) {
+ cache.get(this).add(this);
+ } else {
+ cache.put(this, new ArrayList(Collections.singletonList(this)));
+ }
+
+ return wrappingOperator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8210ff46/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
new file mode 100644
index 0000000..69a6c37
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils.proxy;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.NoOpOperator;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of
+ * type {@code T}. A {@code GraphAlgorithmWrappingDataSet} wraps the resultant
+ * {@link Graph} vertex and edge sets with a {@code NoOpOperator}. The input to
+ * the wrapped operators can be replaced when the same algorithm is run on the
+ * same input with a mergeable configuration. This allows algorithms to be
+ * composed of implicitly reusable algorithms without publicly sharing
+ * intermediate {@link DataSet}s.
+ *
+ * @param <IN_K> input ID type
+ * @param <IN_VV> input vertex value type
+ * @param <IN_EV> input edge value type
+ * @param <OUT_K> output ID type
+ * @param <OUT_VV> output vertex value type
+ * @param <OUT_EV> output edge value type
+ */
+public abstract class GraphAlgorithmWrappingGraph<IN_K, IN_VV, IN_EV, OUT_K, OUT_VV, OUT_EV>
+implements GraphAlgorithm<IN_K, IN_VV, IN_EV, Graph<OUT_K, OUT_VV, OUT_EV>> {
+
+ // each algorithm and input pair may map to multiple configurations
+ private static Map<GraphAlgorithmWrappingGraph, List<GraphAlgorithmWrappingGraph>> cache =
+ Collections.synchronizedMap(new HashMap<GraphAlgorithmWrappingGraph, List<GraphAlgorithmWrappingGraph>>());
+
+ private Graph<IN_K, IN_VV, IN_EV> input;
+
+ private NoOpOperator<Vertex<OUT_K, OUT_VV>> verticesWrappingOperator;
+
+ private NoOpOperator<Edge<OUT_K, OUT_EV>> edgesWrappingOperator;
+
+ /**
+ * Algorithms are identified by name rather than by class to allow subclassing.
+ *
+ * @return name of the algorithm, which may be shared by multiple classes
+ * implementing the same algorithm and generating the same output
+ */
+ protected abstract String getAlgorithmName();
+
+ /**
+ * An algorithm must first test whether the configurations can be merged
+ * before merging individual fields.
+ *
+ * @param other the algorithm with which to compare and merge
+ * @return true if and only if configuration has been merged and the
+ * algorithm's output can be reused
+ */
+ protected abstract boolean mergeConfiguration(GraphAlgorithmWrappingGraph other);
+
+ /**
+ * The implementation of the algorithm, renamed from {@link GraphAlgorithm#run(Graph)}.
+ *
+ * @param input the input graph
+ * @return the algorithm's output
+ * @throws Exception
+ */
+ protected abstract Graph<OUT_K, OUT_VV, OUT_EV> runInternal(Graph<IN_K, IN_VV, IN_EV> input) throws Exception;
+
+ @Override
+ public final int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(input)
+ .append(getAlgorithmName())
+ .toHashCode();
+ }
+
+ @Override
+ public final boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ if (! GraphAlgorithmWrappingGraph.class.isAssignableFrom(obj.getClass())) {
+ return false;
+ }
+
+ GraphAlgorithmWrappingGraph rhs = (GraphAlgorithmWrappingGraph) obj;
+
+ return new EqualsBuilder()
+ .append(input, rhs.input)
+ .append(getAlgorithmName(), rhs.getAlgorithmName())
+ .isEquals();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public final Graph<OUT_K, OUT_VV, OUT_EV> run(Graph<IN_K, IN_VV, IN_EV> input)
+ throws Exception {
+ this.input = input;
+
+ if (cache.containsKey(this)) {
+ for (GraphAlgorithmWrappingGraph<IN_K, IN_VV, IN_EV, OUT_K, OUT_VV, OUT_EV> other : cache.get(this)) {
+ if (mergeConfiguration(other)) {
+ // configuration has been merged so generate new output
+ Graph<OUT_K, OUT_VV, OUT_EV> output = runInternal(input);
+
+ other.verticesWrappingOperator.setInput(output.getVertices());
+ other.edgesWrappingOperator.setInput(output.getEdges());
+
+ verticesWrappingOperator = other.verticesWrappingOperator;
+ edgesWrappingOperator = other.edgesWrappingOperator;
+
+ return Graph.fromDataSet(verticesWrappingOperator, edgesWrappingOperator, output.getContext());
+ }
+ }
+ }
+
+ // no mergeable configuration found so generate new output
+ Graph<OUT_K, OUT_VV, OUT_EV> output = runInternal(input);
+
+ // create a new operator to wrap the algorithm output
+ verticesWrappingOperator = new NoOpOperator<>(output.getVertices(), output.getVertices().getType());
+ edgesWrappingOperator = new NoOpOperator<>(output.getEdges(), output.getEdges().getType());
+
+ // cache this result
+ if (cache.containsKey(this)) {
+ cache.get(this).add(this);
+ } else {
+ cache.put(this, new ArrayList(Collections.singletonList(this)));
+ }
+
+ return Graph.fromDataSet(verticesWrappingOperator, edgesWrappingOperator, output.getContext());
+ }
+}
[3/3] flink git commit: [FLINK-4571] [gelly] Configurable little
parallelism in Gelly drivers
Posted by gr...@apache.org.
[FLINK-4571] [gelly] Configurable little parallelism in Gelly drivers
This closes #2475
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bdd3c0d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bdd3c0d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bdd3c0d9
Branch: refs/heads/master
Commit: bdd3c0d94b2a6cdecb482ee3fdefe082fc1b7c4d
Parents: 8210ff4
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Sep 2 11:53:08 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Sep 8 17:06:29 2016 -0400
----------------------------------------------------------------------
.../graph/examples/ClusteringCoefficient.java | 72 ++++++++++++++------
.../flink/graph/examples/JaccardIndex.java | 30 +++++---
.../annotate/directed/EdgeDegreesPair.java | 3 +-
.../annotate/directed/EdgeSourceDegrees.java | 3 +-
.../annotate/directed/EdgeTargetDegrees.java | 3 +-
.../degree/annotate/directed/VertexDegrees.java | 5 +-
.../annotate/directed/VertexInDegree.java | 7 +-
.../annotate/directed/VertexOutDegree.java | 7 +-
.../annotate/undirected/EdgeDegreePair.java | 3 +-
.../annotate/undirected/EdgeSourceDegree.java | 3 +-
.../annotate/undirected/EdgeTargetDegree.java | 3 +-
.../annotate/undirected/VertexDegree.java | 7 +-
.../degree/filter/undirected/MaximumDegree.java | 3 +-
.../graph/asm/simple/directed/Simplify.java | 3 +-
.../graph/asm/simple/undirected/Simplify.java | 3 +-
.../asm/translate/TranslateEdgeValues.java | 3 +-
.../graph/asm/translate/TranslateGraphIds.java | 3 +-
.../asm/translate/TranslateVertexValues.java | 3 +-
.../directed/LocalClusteringCoefficient.java | 3 +-
.../clustering/directed/TriangleListing.java | 5 +-
.../undirected/LocalClusteringCoefficient.java | 3 +-
.../clustering/undirected/TriangleListing.java | 5 +-
.../flink/graph/library/link_analysis/HITS.java | 3 +-
.../graph/library/similarity/AdamicAdar.java | 3 +-
.../graph/library/similarity/JaccardIndex.java | 3 +-
25 files changed, 124 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
index e099e2b..f4b1ecf 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
@@ -42,6 +42,8 @@ import org.apache.flink.types.StringValue;
import java.text.NumberFormat;
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
/**
* Driver for the library implementations of Global and Local Clustering Coefficient.
*
@@ -89,12 +91,15 @@ public class ClusteringCoefficient {
env.getConfig().enableObjectReuse();
ParameterTool parameters = ParameterTool.fromArgs(args);
+
if (! parameters.has("directed")) {
printUsage();
return;
}
boolean directedAlgorithm = parameters.getBoolean("directed");
+ int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
+
// global and local clustering coefficient results
GraphAnalytic gcc;
DataSet lcc;
@@ -120,14 +125,18 @@ public class ClusteringCoefficient {
if (directedAlgorithm) {
gcc = graph
- .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = graph
- .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
} else {
gcc = graph
- .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = graph
- .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
}
} break;
@@ -137,14 +146,18 @@ public class ClusteringCoefficient {
if (directedAlgorithm) {
gcc = graph
- .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = graph
- .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
} else {
gcc = graph
- .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = graph
- .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
}
} break;
@@ -164,51 +177,66 @@ public class ClusteringCoefficient {
long edgeCount = vertexCount * edgeFactor;
Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+ .setParallelism(little_parallelism)
.generate();
if (directedAlgorithm) {
if (scale > 32) {
Graph<LongValue, NullValue, NullValue> newGraph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
+ .setParallelism(little_parallelism));
gcc = newGraph
- .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
- .setIncludeZeroDegreeVertices(false));
+ .setIncludeZeroDegreeVertices(false)
+ .setLittleParallelism(little_parallelism));
} else {
Graph<IntValue, NullValue, NullValue> newGraph = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>());
+ .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())
+ .setParallelism(little_parallelism))
+ .run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>()
+ .setParallelism(little_parallelism));
gcc = newGraph
- .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
- .setIncludeZeroDegreeVertices(false));
+ .setIncludeZeroDegreeVertices(false)
+ .setLittleParallelism(little_parallelism));
}
} else {
boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
if (scale > 32) {
Graph<LongValue, NullValue, NullValue> newGraph = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+ .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
+ .setParallelism(little_parallelism));
gcc = newGraph
- .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
- .setIncludeZeroDegreeVertices(false));
+ .setIncludeZeroDegreeVertices(false)
+ .setLittleParallelism(little_parallelism));
} else {
Graph<IntValue, NullValue, NullValue> newGraph = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip));
+ .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())
+ .setParallelism(little_parallelism))
+ .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
+ .setParallelism(little_parallelism));
gcc = newGraph
- .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
- .setIncludeZeroDegreeVertices(false));
+ .setIncludeZeroDegreeVertices(false)
+ .setLittleParallelism(little_parallelism));
}
}
} break;
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
index 824aab7..96f66ab 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
@@ -43,6 +43,8 @@ import org.apache.flink.types.StringValue;
import java.text.NumberFormat;
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
/**
* Driver for the library implementation of Jaccard Index.
*
@@ -87,6 +89,8 @@ public class JaccardIndex {
ParameterTool parameters = ParameterTool.fromArgs(args);
+ int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
+
DataSet ji;
switch (parameters.get("input", "")) {
@@ -107,13 +111,15 @@ public class JaccardIndex {
case "integer": {
ji = reader
.keyType(LongValue.class)
- .run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
} break;
case "string": {
ji = reader
.keyType(StringValue.class)
- .run(new org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
} break;
default:
@@ -131,20 +137,26 @@ public class JaccardIndex {
long vertexCount = 1L << scale;
long edgeCount = vertexCount * edgeFactor;
- boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+ .setParallelism(little_parallelism)
.generate();
+ boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+
if (scale > 32) {
ji = graph
- .run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip))
- .run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>());
+ .run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
+ .setParallelism(little_parallelism))
+ .run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
} else {
ji = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
- .run(new Simplify<IntValue, NullValue, NullValue>(clipAndFlip))
- .run(new org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, NullValue>());
+ .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())
+ .setParallelism(little_parallelism))
+ .run(new Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
+ .setParallelism(little_parallelism))
+ .run(new org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
}
} break;
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
index 408516b..6f808f3 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
@@ -73,7 +73,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, Deg
EdgeDegreesPair rhs = (EdgeDegreesPair) other;
- parallelism = Math.min(parallelism, rhs.parallelism);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
index e55e3c6..03fd1ba 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
@@ -72,7 +72,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
EdgeSourceDegrees rhs = (EdgeSourceDegrees) other;
- parallelism = Math.min(parallelism, rhs.parallelism);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
index ed48f98..7526d00 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
@@ -72,7 +72,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
EdgeTargetDegrees rhs = (EdgeTargetDegrees) other;
- parallelism = Math.min(parallelism, rhs.parallelism);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index f4d734e..a27ca29 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -108,7 +108,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> {
// merge configurations
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
- parallelism = Math.min(parallelism, rhs.parallelism);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
return true;
}
@@ -141,7 +142,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> {
.equalTo(0)
.with(new JoinVertexWithVertexDegrees<K, VV>())
.setParallelism(parallelism)
- .name("Join zero degree vertices");
+ .name("Zero degree vertices");
}
return vertexDegrees;
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index 3f842a6..934c4ed 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -101,7 +101,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
// merge configurations
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
- parallelism = Math.min(parallelism, rhs.parallelism);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
return true;
}
@@ -114,7 +115,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
.getEdges()
.map(new MapEdgeToTargetId<K, EV>())
.setParallelism(parallelism)
- .name("Map edge to target ID");
+ .name("Edge to target ID");
// t, d(t)
DataSet<Vertex<K, LongValue>> targetDegree = targetIds
@@ -131,7 +132,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
.equalTo(0)
.with(new JoinVertexWithVertexDegree<K, VV>())
.setParallelism(parallelism)
- .name("Join zero degree vertices");
+ .name("Zero degree vertices");
}
return targetDegree;
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index 0ec4fc1..a8745ca 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -101,7 +101,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
// merge configurations
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
- parallelism = Math.min(parallelism, rhs.parallelism);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
return true;
}
@@ -114,7 +115,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
.getEdges()
.map(new MapEdgeToSourceId<K, EV>())
.setParallelism(parallelism)
- .name("Map edge to source ID");
+ .name("Edge to source ID");
// s, d(s)
DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds
@@ -131,7 +132,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
.equalTo(0)
.with(new JoinVertexWithVertexDegree<K, VV>())
.setParallelism(parallelism)
- .name("Join zero degree vertices");
+ .name("Zero degree vertices");
}
return sourceDegree;
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
index 09ef975..71b4891 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
@@ -95,7 +95,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, L
EdgeDegreePair rhs = (EdgeDegreePair) other;
reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
- parallelism = Math.min(parallelism, rhs.parallelism);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
index 702fead..ee9a144 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
@@ -93,7 +93,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>>
EdgeSourceDegree rhs = (EdgeSourceDegree) other;
reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
- parallelism = Math.min(parallelism, rhs.parallelism);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
index 724567e..1255d86 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
@@ -93,7 +93,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>>
EdgeTargetDegree rhs = (EdgeTargetDegree) other;
reduceOnSourceId.mergeWith(rhs.reduceOnSourceId);
- parallelism = Math.min(parallelism, rhs.parallelism);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index 0f753fc..f466f85 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -122,7 +122,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
- parallelism = Math.min(parallelism, rhs.parallelism);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
return true;
}
@@ -138,7 +139,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
.getEdges()
.map(mapEdgeToId)
.setParallelism(parallelism)
- .name("Map edge to vertex ID");
+ .name("Edge to vertex ID");
// v, deg(v)
DataSet<Vertex<K, LongValue>> degree = vertexIds
@@ -156,7 +157,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
.equalTo(0)
.with(new JoinVertexWithVertexDegree<K, VV>())
.setParallelism(parallelism)
- .name("Join zero degree vertices");
+ .name("Zero degree vertices");
}
return degree;
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
index be19ffd..e5eea61 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
@@ -139,7 +139,8 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
broadcastHighDegreeVertices.mergeWith(rhs.broadcastHighDegreeVertices);
- parallelism = Math.min(parallelism, rhs.parallelism);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
index 99ffe0d..3d1fcee 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -71,7 +71,8 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
Simplify rhs = (Simplify) other;
- parallelism = Math.min(parallelism, rhs.parallelism);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
index 45cd3f9..c3d8983 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -97,7 +97,8 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
// merge configurations
- parallelism = Math.min(parallelism, rhs.parallelism);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
index bde826e..b2b7594 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
@@ -93,7 +93,8 @@ extends GraphAlgorithmWrappingGraph<K, VV, OLD, K, VV, NEW> {
// merge configurations
- parallelism = Math.min(parallelism, rhs.parallelism);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
index 2c67c5a..e079a41 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
@@ -95,7 +95,8 @@ extends GraphAlgorithmWrappingGraph<OLD, VV, EV, NEW, VV, EV> {
// merge configurations
- parallelism = Math.min(parallelism, rhs.parallelism);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
index 9e6784e..7447e11 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
@@ -93,7 +93,8 @@ extends GraphAlgorithmWrappingGraph<K, OLD, EV, K, NEW, EV> {
// merge configurations
- parallelism = Math.min(parallelism, rhs.parallelism);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index 9d323a8..608500b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -118,7 +118,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
- littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
+ littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
+ ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index 7df288a..e1b3040 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -113,7 +113,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
TriangleListing rhs = (TriangleListing) other;
sortTriangleVertices.mergeWith(rhs.sortTriangleVertices);
- littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
+ littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
+ ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
return true;
}
@@ -162,7 +163,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
.groupBy(0)
.sortGroup(1, Order.ASCENDING)
.reduceGroup(new GenerateTriplets<K>())
- .setParallelism(littleParallelism)
.name("Generate triplets");
// u, v, w, bitmask where (u, v), (u, w), and (v, w) are edges in graph
@@ -171,7 +171,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
.where(1, 2)
.equalTo(0, 1)
.with(new ProjectTriangles<K>())
- .setParallelism(littleParallelism)
.name("Triangle listing");
if (sortTriangleVertices.get()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 293e3f9..3621156 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -118,7 +118,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
// merge configurations
includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
- littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
+ littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
+ ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index c3dbf3e..8850125 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -114,7 +114,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, K, K>> {
TriangleListing rhs = (TriangleListing) other;
sortTriangleVertices.mergeWith(rhs.sortTriangleVertices);
- littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
+ littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
+ ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
return true;
}
@@ -155,7 +156,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, K, K>> {
.groupBy(0)
.sortGroup(1, Order.ASCENDING)
.reduceGroup(new GenerateTriplets<K>())
- .setParallelism(littleParallelism)
.name("Generate triplets");
// u, v, w where (u, v), (u, w), and (v, w) are edges in graph, v < w
@@ -164,7 +164,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, K, K>> {
.where(1, 2)
.equalTo(0, 1)
.with(new ProjectTriangles<K>())
- .setParallelism(littleParallelism)
.name("Triangle listing");
if (sortTriangleVertices.get()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
index 7ba6fee..9e3511c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
@@ -148,7 +148,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
maxIterations = Math.max(maxIterations, rhs.maxIterations);
convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold);
- parallelism = Math.min(parallelism, rhs.parallelism);
+ parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+ ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index 1514866..00819e4 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -151,7 +151,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
// merge configurations
- littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
+ littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
+ ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd3c0d9/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index 1e406fa..148d541 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -181,7 +181,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
// merge configurations
groupSize = Math.max(groupSize, rhs.groupSize);
- littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
+ littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
+ ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
return true;
}