You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/05 20:53:11 UTC
[6/6] flink git commit: [FLINK-7006] [gelly] Base class using POJOs
for Gelly algorithms
[FLINK-7006] [gelly] Base class using POJOs for Gelly algorithms
Gelly algorithms commonly have a Result class extending a Tuple type and
implementing one of the Unary/Binary/TertiaryResult interfaces.
Add a Unary/Binary/TertiaryResultBase class implementing each interface
and convert the Result classes to POJOs extending the base result
classes.
This closes #4201
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98a15004
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98a15004
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98a15004
Branch: refs/heads/master
Commit: 98a15004cd815d14f4c243dd25a13aa4b122112c
Parents: 7c150a6
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Jun 26 10:21:50 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Jul 5 15:22:48 2017 -0400
----------------------------------------------------------------------
.../apache/flink/graph/drivers/input/CSV.java | 7 +-
.../drivers/input/GeneratedMultiGraph.java | 7 +-
.../flink/graph/drivers/input/RMatGraph.java | 7 +-
.../drivers/parameter/ChoiceParameter.java | 7 +-
.../drivers/parameter/StringParameter.java | 7 +-
.../graph/drivers/TriangleListingITCase.java | 24 +--
.../apache/flink/graph/asm/dataset/Collect.java | 2 -
.../graph/asm/dataset/DataSetAnalytic.java | 27 +--
.../annotate/directed/EdgeDegreesPair.java | 8 +-
.../annotate/directed/EdgeSourceDegrees.java | 8 +-
.../annotate/directed/EdgeTargetDegrees.java | 8 +-
.../degree/annotate/directed/VertexDegrees.java | 8 +-
.../annotate/directed/VertexInDegree.java | 8 +-
.../annotate/directed/VertexOutDegree.java | 8 +-
.../annotate/undirected/EdgeDegreePair.java | 8 +-
.../annotate/undirected/EdgeSourceDegree.java | 8 +-
.../annotate/undirected/EdgeTargetDegree.java | 8 +-
.../annotate/undirected/VertexDegree.java | 8 +-
.../degree/filter/undirected/MaximumDegree.java | 8 +-
.../flink/graph/asm/result/BinaryResult.java | 28 ++-
.../graph/asm/result/BinaryResultBase.java | 53 +++++
.../flink/graph/asm/result/ResultBase.java | 36 ++++
.../flink/graph/asm/result/TertiaryResult.java | 5 +-
.../graph/asm/result/TertiaryResultBase.java | 66 +++++++
.../flink/graph/asm/result/UnaryResult.java | 5 +-
.../flink/graph/asm/result/UnaryResultBase.java | 42 ++++
.../graph/asm/simple/directed/Simplify.java | 8 +-
.../graph/asm/simple/undirected/Simplify.java | 8 +-
.../asm/translate/TranslateEdgeValues.java | 8 +-
.../graph/asm/translate/TranslateGraphIds.java | 8 +-
.../asm/translate/TranslateVertexValues.java | 8 +-
.../library/clustering/TriangleListingBase.java | 91 +++++++++
.../directed/LocalClusteringCoefficient.java | 97 ++++++----
.../clustering/directed/TriadicCensus.java | 34 ++--
.../clustering/directed/TriangleListing.java | 192 +++++++------------
.../undirected/LocalClusteringCoefficient.java | 99 ++++++----
.../clustering/undirected/TriangleListing.java | 145 ++++----------
.../flink/graph/library/linkanalysis/HITS.java | 89 +++++----
.../graph/library/linkanalysis/PageRank.java | 70 ++++---
.../graph/library/similarity/AdamicAdar.java | 99 +++++-----
.../graph/library/similarity/JaccardIndex.java | 151 +++++++--------
.../utils/proxy/GraphAlgorithmWrappingBase.java | 61 ++++++
.../proxy/GraphAlgorithmWrappingDataSet.java | 33 +---
.../proxy/GraphAlgorithmWrappingGraph.java | 32 +---
.../directed/TriangleListingTest.java | 4 +-
.../undirected/TriangleListingTest.java | 2 +-
.../graph/library/linkanalysis/HITSTest.java | 4 +-
.../library/linkanalysis/PageRankTest.java | 12 +-
48 files changed, 933 insertions(+), 733 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
index 697da97..7193e5f 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
@@ -70,12 +70,7 @@ extends InputBase<K, NullValue, NullValue> {
return WordUtils.capitalize(getName()) + WordUtils.capitalize(type.getValue()) + " (" + inputFilename + ")";
}
- /**
- * Generate the graph as configured.
- *
- * @param env execution environment
- * @return input graph
- */
+ @Override
public Graph<K, NullValue, NullValue> create(ExecutionEnvironment env) throws Exception {
GraphCsvReader reader = Graph.fromCsvReader(inputFilename.getValue(), env)
.ignoreCommentsEdges(commentPrefix.getValue())
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedMultiGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedMultiGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedMultiGraph.java
index c0a16a8..02c8e58 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedMultiGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedMultiGraph.java
@@ -42,12 +42,7 @@ extends GeneratedGraph<K> {
return simplify.getShortString();
}
- /**
- * Generate the graph as configured.
- *
- * @param env Flink execution environment
- * @return input graph
- */
+ @Override
public Graph<K, NullValue, NullValue> create(ExecutionEnvironment env)
throws Exception {
Graph<K, NullValue, NullValue> graph = super.create(env);
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
index 66ba888..cce7afa 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
@@ -93,12 +93,7 @@ extends GeneratedMultiGraph<LongValue> {
return 1L << scale.getValue();
}
- /**
- * Generate the graph as configured.
- *
- * @param env Flink execution environment
- * @return input graph
- */
+ @Override
public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) throws Exception {
int lp = littleParallelism.getValue().intValue();
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ChoiceParameter.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ChoiceParameter.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ChoiceParameter.java
index c8033de..2c4544f 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ChoiceParameter.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ChoiceParameter.java
@@ -51,12 +51,7 @@ extends SimpleParameter<String> {
super(owner, name);
}
- /**
- * Set the default value and add to the list of choices.
- *
- * @param defaultValue the default value.
- * @return this
- */
+ @Override
public ChoiceParameter setDefaultValue(String defaultValue) {
super.setDefaultValue(defaultValue);
choices.add(defaultValue);
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/StringParameter.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/StringParameter.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/StringParameter.java
index 34194ec..9ac31f2 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/StringParameter.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/StringParameter.java
@@ -37,12 +37,7 @@ extends SimpleParameter<String> {
super(owner, name);
}
- /**
- * Set the default value.
- *
- * @param defaultValue the default value.
- * @return this
- */
+ @Override
public StringParameter setDefaultValue(String defaultValue) {
super.setDefaultValue(defaultValue);
return this;
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
index 1e330dd..c691922 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
@@ -69,15 +69,15 @@ extends CopyableValueDriverBaseITCase {
case "short":
case "char":
case "integer":
- checksum = 0x000000003d2f0a9aL;
+ checksum = 0x00000784d2c336cdL;
break;
case "long":
- checksum = 0x000000016aba3720L;
+ checksum = 0x0000078e5ebf2927L;
break;
case "string":
- checksum = 0x0000005bfef84facL;
+ checksum = 0x0000077eddf67481L;
break;
default:
@@ -115,15 +115,15 @@ extends CopyableValueDriverBaseITCase {
case "short":
case "char":
case "integer":
- checksum = 0x0000000001f92b0cL;
+ checksum = 0x0000075b6b9a0ad0L;
break;
case "long":
- checksum = 0x000000000bb355c6L;
+ checksum = 0x00000761619e7f3cL;
break;
case "string":
- checksum = 0x00000002f7b5576aL;
+ checksum = 0x0000079b15eb30acL;
break;
default:
@@ -154,15 +154,15 @@ extends CopyableValueDriverBaseITCase {
case "short":
case "char":
case "integer":
- checksum = 0x00000248fef26209L;
+ checksum = 0x0003a986d6bedc53L;
break;
case "long":
- checksum = 0x000002dcdf0fbb1bL;
+ checksum = 0x0003a8d91c92b884L;
break;
case "string":
- checksum = 0x00035b760ab9da74L;
+ checksum = 0x0003a88fffc33f27L;
break;
default:
@@ -205,15 +205,15 @@ extends CopyableValueDriverBaseITCase {
case "short":
case "char":
case "integer":
- checksum = 0x00000012dee4bf2cL;
+ checksum = 0x0003a95630aae344L;
break;
case "long":
- checksum = 0x00000017a40efbdaL;
+ checksum = 0x0003a9b1e055d59dL;
break;
case "string":
- checksum = 0x000159e8be3e370bL;
+ checksum = 0x0003aa1e3d8f2c6bL;
break;
default:
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
index ad2886f..b456db6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
@@ -21,7 +21,6 @@ package org.apache.flink.graph.asm.dataset;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.AnalyticHelper;
import java.io.IOException;
@@ -47,7 +46,6 @@ extends DataSetAnalyticBase<T, List<T>> {
throws Exception {
super.run(input);
- ExecutionEnvironment env = input.getExecutionEnvironment();
serializer = input.getType().createSerializer(env.getConfig());
collectHelper = new CollectHelper<>(serializer);
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java
index 9c5c448..d8d796b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java
@@ -35,14 +35,16 @@ import org.apache.flink.api.java.operators.CustomUnaryOperation;
public interface DataSetAnalytic<T, R> {
/**
- * This method must be called after the program has executed.
- * 1) "run" analytics and algorithms
- * 2) call ExecutionEnvironment.execute()
- * 3) get analytic results
+ * All {@code DataSetAnalytic} processing must be terminated by an
+ * {@link OutputFormat} and obtained via accumulators rather than
+ * returned by a {@link DataSet}.
*
- * @return the result
+ * @param input input dataset
+ * @return this
+ * @throws Exception
*/
- R getResult();
+
+ DataSetAnalytic<T, R> run(DataSet<T> input) throws Exception;
/**
* Execute the program and return the result.
@@ -62,13 +64,12 @@ public interface DataSetAnalytic<T, R> {
R execute(String jobName) throws Exception;
/**
- * All {@code DataSetAnalytic} processing must be terminated by an
- * {@link OutputFormat} and obtained via accumulators rather than
- * returned by a {@link DataSet}.
+ * This method must be called after the program has executed.
+ * 1) "run" analytics and algorithms
+ * 2) call ExecutionEnvironment.execute()
+ * 3) get analytic results
*
- * @param input input dataset
- * @return this
- * @throws Exception
+ * @return the result
*/
- DataSetAnalytic<T, R> run(DataSet<T> input) throws Exception;
+ R getResult();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
index 27c829b..685031c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
@@ -27,6 +27,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.util.Preconditions;
@@ -59,12 +60,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, Deg
}
@Override
- protected String getAlgorithmName() {
- return EdgeDegreesPair.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);
if (!EdgeDegreesPair.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
index 3c4e611..0299839 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
@@ -26,6 +26,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.util.Preconditions;
@@ -58,12 +59,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
}
@Override
- protected String getAlgorithmName() {
- return EdgeSourceDegrees.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);
if (!EdgeSourceDegrees.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
index 94788e2..7c13b39 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
@@ -26,6 +26,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.util.Preconditions;
@@ -58,12 +59,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
}
@Override
- protected String getAlgorithmName() {
- return EdgeTargetDegrees.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);
if (!EdgeTargetDegrees.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index 0333b8b..b0cb7d7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -33,6 +33,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.utils.MurmurHash;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.ByteValue;
@@ -85,12 +86,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> {
}
@Override
- protected String getAlgorithmName() {
- return VertexDegrees.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);
if (!VertexDegrees.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index f316b9b..94c2667 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -25,6 +25,7 @@ import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToTargetId;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
@@ -78,12 +79,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
}
@Override
- protected String getAlgorithmName() {
- return VertexInDegree.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);
if (!VertexInDegree.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index b04391d..00f2f89 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -25,6 +25,7 @@ import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
@@ -78,12 +79,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
}
@Override
- protected String getAlgorithmName() {
- return VertexOutDegree.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);
if (!VertexOutDegree.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
index c6d0646..6228cac 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
@@ -26,6 +26,7 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
@@ -80,12 +81,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, L
}
@Override
- protected String getAlgorithmName() {
- return EdgeDegreePair.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);
if (!EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
index 88bab5a..01ff7d0 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
@@ -25,6 +25,7 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
@@ -78,12 +79,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>>
}
@Override
- protected String getAlgorithmName() {
- return EdgeSourceDegree.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);
if (!EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
index 21918c7..d3316ea 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
@@ -25,6 +25,7 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
@@ -78,12 +79,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>>
}
@Override
- protected String getAlgorithmName() {
- return EdgeTargetDegree.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);
if (!EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index 9ea99a7..626c11b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -28,6 +28,7 @@ import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.Degr
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToTargetId;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
@@ -98,12 +99,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
}
@Override
- protected String getAlgorithmName() {
- return VertexDegree.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);
if (!VertexDegree.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
index ae1e5b6..b485507 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
@@ -29,6 +29,7 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
@@ -115,12 +116,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
}
@Override
- protected String getAlgorithmName() {
- return MaximumDegree.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);
if (!MaximumDegree.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResult.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResult.java
index b54f00c..f41268c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResult.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResult.java
@@ -18,12 +18,17 @@
package org.apache.flink.graph.asm.result;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
/**
* A {@link GraphAlgorithm} result for a pair vertices.
*/
-public interface BinaryResult<T> {
+public interface BinaryResult<T>
+extends Serializable {
/**
* Get the first vertex ID.
@@ -52,4 +57,25 @@ public interface BinaryResult<T> {
* @param value new vertex ID
*/
void setVertexId1(T value);
+
+ /**
+ * Output each input and a second result with the vertex order flipped.
+ *
+ * @param <T> ID type
+ * @param <RT> result type
+ */
+ class MirrorResult<T, RT extends BinaryResult<T>>
+ implements FlatMapFunction<RT, RT> {
+ @Override
+ public void flatMap(RT value, Collector<RT> out)
+ throws Exception {
+ out.collect(value);
+
+ T tmp = value.getVertexId0();
+ value.setVertexId0(value.getVertexId1());
+ value.setVertexId1(tmp);
+
+ out.collect(value);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResultBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResultBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResultBase.java
new file mode 100644
index 0000000..45791ee
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResultBase.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.result;
+
+/**
+ * Base class for algorithm results for a pair of vertices.
+ *
+ * @param <K> graph ID type
+ */
+public abstract class BinaryResultBase<K>
+extends ResultBase
+implements BinaryResult<K> {
+
+ private K vertexId0;
+
+ private K vertexId1;
+
+ @Override
+ public K getVertexId0() {
+ return vertexId0;
+ }
+
+ @Override
+ public void setVertexId0(K vertexId0) {
+ this.vertexId0 = vertexId0;
+ }
+
+ @Override
+ public K getVertexId1() {
+ return vertexId1;
+ }
+
+ @Override
+ public void setVertexId1(K vertexId1) {
+ this.vertexId1 = vertexId1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/ResultBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/ResultBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/ResultBase.java
new file mode 100644
index 0000000..01b13c6
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/ResultBase.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.asm.result;
+
+/**
+ * Base class for algorithm results.
+ */
+public abstract class ResultBase {
+
+ /**
+ * {@link Object#toString()} must be overridden to write POJO values in the
+ * same form as {@link org.apache.flink.api.java.tuple.Tuple}. Values are
+ * comma-separated and enclosed in parenthesis, e.g. "(f0,f1)".
+ *
+ * @return tuple representation string
+ */
+ @Override
+ public abstract String toString();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResult.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResult.java
index 246b8cb..a546387 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResult.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResult.java
@@ -20,10 +20,13 @@ package org.apache.flink.graph.asm.result;
import org.apache.flink.graph.GraphAlgorithm;
+import java.io.Serializable;
+
/**
* A {@link GraphAlgorithm} result for three vertices.
*/
-public interface TertiaryResult<T> {
+public interface TertiaryResult<T>
+extends Serializable {
/**
* Get the first vertex ID.
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResultBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResultBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResultBase.java
new file mode 100644
index 0000000..eb505b8
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResultBase.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.asm.result;
+
+/**
+ * Base class for algorithm results for three vertices.
+ *
+ * @param <K> graph ID type
+ */
+public abstract class TertiaryResultBase<K>
+extends ResultBase
+implements TertiaryResult<K> {
+
+ private K vertexId0;
+
+ private K vertexId1;
+
+ private K vertexId2;
+
+ @Override
+ public K getVertexId0() {
+ return vertexId0;
+ }
+
+ @Override
+ public void setVertexId0(K vertexId0) {
+ this.vertexId0 = vertexId0;
+ }
+
+ @Override
+ public K getVertexId1() {
+ return vertexId1;
+ }
+
+ @Override
+ public void setVertexId1(K vertexId1) {
+ this.vertexId1 = vertexId1;
+ }
+
+ @Override
+ public K getVertexId2() {
+ return vertexId2;
+ }
+
+ @Override
+ public void setVertexId2(K vertexId2) {
+ this.vertexId2 = vertexId2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResult.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResult.java
index 04a2d30..d861b43 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResult.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResult.java
@@ -20,10 +20,13 @@ package org.apache.flink.graph.asm.result;
import org.apache.flink.graph.GraphAlgorithm;
+import java.io.Serializable;
+
/**
* A {@link GraphAlgorithm} result for a single vertex.
*/
-public interface UnaryResult<T> {
+public interface UnaryResult<T>
+extends Serializable {
/**
* Get the first vertex ID.
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResultBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResultBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResultBase.java
new file mode 100644
index 0000000..5bd3229
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResultBase.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.asm.result;
+
+/**
+ * Base class for algorithm results for a single vertex.
+ *
+ * @param <K> graph ID type
+ */
+public abstract class UnaryResultBase<K>
+extends ResultBase
+implements UnaryResult<K> {
+
+ private K vertexId0;
+
+ @Override
+ public K getVertexId0() {
+ return vertexId0;
+ }
+
+ @Override
+ public void setVertexId0(K vertexId0) {
+ this.vertexId0 = vertexId0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
index d978096..0d60cd9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
import org.apache.flink.util.Preconditions;
@@ -56,12 +57,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
}
@Override
- protected String getAlgorithmName() {
- return Simplify.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);
if (!Simplify.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
index 617dce1..dd3a9b3 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
@@ -74,12 +75,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
}
@Override
- protected String getAlgorithmName() {
- return Simplify.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);
if (!Simplify.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
index 3956a81..e444b5c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
@@ -21,6 +21,7 @@ package org.apache.flink.graph.asm.translate;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
import org.apache.flink.util.Preconditions;
@@ -71,12 +72,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, OLD, K, VV, NEW> {
}
@Override
- protected String getAlgorithmName() {
- return TranslateEdgeValues.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);
if (!TranslateEdgeValues.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
index d8c5676..3eb5113 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
import org.apache.flink.util.Preconditions;
@@ -73,12 +74,7 @@ extends GraphAlgorithmWrappingGraph<OLD, VV, EV, NEW, VV, EV> {
}
@Override
- protected String getAlgorithmName() {
- return TranslateGraphIds.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);
if (!TranslateGraphIds.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
index 452cb26..38f415c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
@@ -21,6 +21,7 @@ package org.apache.flink.graph.asm.translate;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
import org.apache.flink.util.Preconditions;
@@ -71,12 +72,7 @@ extends GraphAlgorithmWrappingGraph<K, OLD, EV, K, NEW, EV> {
}
@Override
- protected String getAlgorithmName() {
- return TranslateVertexValues.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);
if (!TranslateVertexValues.class.isAssignableFrom(other.getClass())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
new file mode 100644
index 0000000..7357fbc
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.library.clustering;
+
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Common configuration for directed and undirected Triangle Listing algorithms.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ * @param <R> result type
+ */
+public abstract class TriangleListingBase<K, VV, EV, R>
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, R> {
+
+ // Optional configuration
+ protected OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, true);
+
+ protected int littleParallelism = PARALLELISM_DEFAULT;
+
+ /**
+ * Normalize the triangle listing such that for each result (K0, K1, K2)
+ * the vertex IDs are sorted K0 < K1 < K2.
+ *
+ * @param sortTriangleVertices whether to output each triangle's vertices in sorted order
+ * @return this
+ */
+ public TriangleListingBase<K, VV, EV, R> setSortTriangleVertices(boolean sortTriangleVertices) {
+ this.sortTriangleVertices.set(sortTriangleVertices);
+
+ return this;
+ }
+
+ /**
+ * Override the parallelism of operators processing small amounts of data.
+ *
+ * @param littleParallelism operator parallelism
+ * @return this
+ */
+ public TriangleListingBase<K, VV, EV, R> setLittleParallelism(int littleParallelism) {
+ Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
+ "The parallelism must be greater than zero.");
+
+ this.littleParallelism = littleParallelism;
+
+ return this;
+ }
+
+ @Override
+ protected final boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
+ Preconditions.checkNotNull(other);
+
+ if (!TriangleListingBase.class.isAssignableFrom(other.getClass())) {
+ return false;
+ }
+
+ TriangleListingBase rhs = (TriangleListingBase) other;
+
+ // merge configurations
+
+ sortTriangleVertices.mergeWith(rhs.sortTriangleVertices);
+ littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
+ ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index b980244..03c8808 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -27,15 +27,15 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.asm.result.PrintableResult;
-import org.apache.flink.graph.asm.result.UnaryResult;
+import org.apache.flink.graph.asm.result.UnaryResultBase;
import org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result;
import org.apache.flink.graph.utils.MurmurHash;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.CopyableValue;
@@ -100,12 +100,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
}
@Override
- protected String getAlgorithmName() {
- return LocalClusteringCoefficient.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);
if (!LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
@@ -192,17 +187,17 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
@Override
public void flatMap(TriangleListing.Result<T> value, Collector<Tuple2<T, LongValue>> out)
throws Exception {
- byte bitmask = value.f3.getValue();
+ byte bitmask = value.getBitmask().getValue();
- output.f0 = value.f0;
+ output.f0 = value.getVertexId0();
output.f1 = ((bitmask & 0b000011) == 0b000011) ? two : one;
out.collect(output);
- output.f0 = value.f1;
+ output.f0 = value.getVertexId1();
output.f1 = ((bitmask & 0b001100) == 0b001100) ? two : one;
out.collect(output);
- output.f0 = value.f2;
+ output.f0 = value.getVertexId2();
output.f1 = ((bitmask & 0b110000) == 0b110000) ? two : one;
out.collect(output);
}
@@ -229,8 +224,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
*
* @param <T> ID type
*/
- @ForwardedFieldsFirst("0; 1.0->1")
- @ForwardedFieldsSecond("0")
+ @ForwardedFieldsFirst("0->vertexId0; 1.0->degree")
+ @ForwardedFieldsSecond("0->vertexId0")
private static class JoinVertexDegreeWithTriangleCount<T>
implements JoinFunction<Vertex<T, Degrees>, Tuple2<T, LongValue>, Result<T>> {
private LongValue zero = new LongValue(0);
@@ -240,35 +235,25 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
@Override
public Result<T> join(Vertex<T, Degrees> vertexAndDegree, Tuple2<T, LongValue> vertexAndTriangleCount)
throws Exception {
- output.f0 = vertexAndDegree.f0;
- output.f1 = vertexAndDegree.f1.f0;
- output.f2 = (vertexAndTriangleCount == null) ? zero : vertexAndTriangleCount.f1;
+ output.setVertexId0(vertexAndDegree.f0);
+ output.setDegree(vertexAndDegree.f1.f0);
+ output.setTriangleCount((vertexAndTriangleCount == null) ? zero : vertexAndTriangleCount.f1);
return output;
}
}
/**
- * Wraps {@link Tuple3} to encapsulate results from the Local Clustering Coefficient algorithm.
+ * A result for the directed Local Clustering Coefficient algorithm.
*
* @param <T> ID type
*/
public static class Result<T>
- extends Tuple3<T, LongValue, LongValue>
- implements PrintableResult, UnaryResult<T> {
- public static final int HASH_SEED = 0x37a208c4;
+ extends UnaryResultBase<T>
+ implements PrintableResult {
+ private LongValue degree;
- private MurmurHash hasher = new MurmurHash(HASH_SEED);
-
- @Override
- public T getVertexId0() {
- return f0;
- }
-
- @Override
- public void setVertexId0(T value) {
- f0 = value;
- }
+ private LongValue triangleCount;
/**
* Get the vertex degree.
@@ -276,7 +261,16 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
* @return vertex degree
*/
public LongValue getDegree() {
- return f1;
+ return degree;
+ }
+
+ /**
+ * Set the vertex degree.
+ *
+ * @param degree vertex degree
+ */
+ public void setDegree(LongValue degree) {
+ this.degree = degree;
}
/**
@@ -286,7 +280,17 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
* @return triangle count
*/
public LongValue getTriangleCount() {
- return f2;
+ return triangleCount;
+ }
+
+ /**
+ * Set the number of triangles containing this vertex; equivalently,
+ * this is the number of edges between neighbors of this vertex.
+ *
+ * @param triangleCount triangle count
+ */
+ public void setTriangleCount(LongValue triangleCount) {
+ this.triangleCount = triangleCount;
}
/**
@@ -306,11 +310,20 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
return (neighborPairs == 0) ? Double.NaN : getTriangleCount().getValue() / (double) neighborPairs;
}
+ @Override
+ public String toString() {
+ return "(" + getVertexId0()
+ + "," + getDegree()
+ + "," + getTriangleCount()
+ + ")";
+ }
+
/**
* Format values into a human-readable string.
*
* @return verbose string
*/
+ @Override
public String toPrintableString() {
return "Vertex ID: " + getVertexId0()
+ ", vertex degree: " + getDegree()
@@ -318,12 +331,22 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
+ ", local clustering coefficient: " + getLocalClusteringCoefficientScore();
}
+ // ----------------------------------------------------------------------------------------
+
+ public static final int HASH_SEED = 0x37a208c4;
+
+ private transient MurmurHash hasher;
+
@Override
public int hashCode() {
+ if (hasher == null) {
+ hasher = new MurmurHash(HASH_SEED);
+ }
+
return hasher.reset()
- .hash(f0.hashCode())
- .hash(f1.getValue())
- .hash(f2.getValue())
+ .hash(getVertexId0().hashCode())
+ .hash(degree.getValue())
+ .hash(triangleCount.getValue())
.hash();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
index 949dd4c..6ec9f0f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
@@ -102,24 +102,24 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
BigInteger three = BigInteger.valueOf(3);
BigInteger six = BigInteger.valueOf(6);
- BigInteger vertexCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "vc"));
- BigInteger unidirectionalEdgeCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "uec") / 2);
- BigInteger bidirectionalEdgeCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "bec") / 2);
- BigInteger triplet021dCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "021d"));
- BigInteger triplet021uCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "021u"));
- BigInteger triplet021cCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "021c"));
- BigInteger triplet111dCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "111d"));
- BigInteger triplet111uCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "111u"));
- BigInteger triplet201Count = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "201"));
+ BigInteger vertexCount = BigInteger.valueOf(vertexDegreesHelper.<Long>getAccumulator(env, "vc"));
+ BigInteger unidirectionalEdgeCount = BigInteger.valueOf(vertexDegreesHelper.<Long>getAccumulator(env, "uec") / 2);
+ BigInteger bidirectionalEdgeCount = BigInteger.valueOf(vertexDegreesHelper.<Long>getAccumulator(env, "bec") / 2);
+ BigInteger triplet021dCount = BigInteger.valueOf(vertexDegreesHelper.<Long>getAccumulator(env, "021d"));
+ BigInteger triplet021uCount = BigInteger.valueOf(vertexDegreesHelper.<Long>getAccumulator(env, "021u"));
+ BigInteger triplet021cCount = BigInteger.valueOf(vertexDegreesHelper.<Long>getAccumulator(env, "021c"));
+ BigInteger triplet111dCount = BigInteger.valueOf(vertexDegreesHelper.<Long>getAccumulator(env, "111d"));
+ BigInteger triplet111uCount = BigInteger.valueOf(vertexDegreesHelper.<Long>getAccumulator(env, "111u"));
+ BigInteger triplet201Count = BigInteger.valueOf(vertexDegreesHelper.<Long>getAccumulator(env, "201"));
// triads with three connecting edges = closed triplet = triangle
- BigInteger triangle030tCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "030t"));
- BigInteger triangle030cCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "030c"));
- BigInteger triangle120dCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "120d"));
- BigInteger triangle120uCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "120u"));
- BigInteger triangle120cCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "120c"));
- BigInteger triangle210Count = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "210"));
- BigInteger triangle300Count = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "300"));
+ BigInteger triangle030tCount = BigInteger.valueOf(triangleListingHelper.<Long>getAccumulator(env, "030t"));
+ BigInteger triangle030cCount = BigInteger.valueOf(triangleListingHelper.<Long>getAccumulator(env, "030c"));
+ BigInteger triangle120dCount = BigInteger.valueOf(triangleListingHelper.<Long>getAccumulator(env, "120d"));
+ BigInteger triangle120uCount = BigInteger.valueOf(triangleListingHelper.<Long>getAccumulator(env, "120u"));
+ BigInteger triangle120cCount = BigInteger.valueOf(triangleListingHelper.<Long>getAccumulator(env, "120c"));
+ BigInteger triangle210Count = BigInteger.valueOf(triangleListingHelper.<Long>getAccumulator(env, "210"));
+ BigInteger triangle300Count = BigInteger.valueOf(triangleListingHelper.<Long>getAccumulator(env, "300"));
// triads with two connecting edges = open triplet;
// each triangle deducts the count of three triplets
@@ -211,7 +211,7 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
@Override
public void writeRecord(TriangleListing.Result<T> record) throws IOException {
- triangleCount[record.f3.getValue()]++;
+ triangleCount[record.getBitmask().getValue()]++;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index 38b0746..23e88d1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -36,21 +36,18 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.asm.result.PrintableResult;
-import org.apache.flink.graph.asm.result.TertiaryResult;
+import org.apache.flink.graph.asm.result.TertiaryResultBase;
+import org.apache.flink.graph.library.clustering.TriangleListingBase;
import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
-import org.apache.flink.graph.utils.proxy.OptionalBoolean;
+import org.apache.flink.graph.utils.MurmurHash;
import org.apache.flink.types.ByteValue;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
/**
* Generates a listing of distinct triangles from the input graph.
*
@@ -68,62 +65,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* @param <EV> edge value type
*/
public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
-
- // Optional configuration
- private OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, true);
-
- private int littleParallelism = PARALLELISM_DEFAULT;
-
- /**
- * Normalize the triangle listing such that for each result (K0, K1, K2)
- * the vertex IDs are sorted K0 < K1 < K2.
- *
- * @param sortTriangleVertices whether to output each triangle's vertices in sorted order
- * @return this
- */
- public TriangleListing<K, VV, EV> setSortTriangleVertices(boolean sortTriangleVertices) {
- this.sortTriangleVertices.set(sortTriangleVertices);
-
- return this;
- }
-
- /**
- * Override the parallelism of operators processing small amounts of data.
- *
- * @param littleParallelism operator parallelism
- * @return this
- */
- public TriangleListing<K, VV, EV> setLittleParallelism(int littleParallelism) {
- Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
- "The parallelism must be greater than zero.");
-
- this.littleParallelism = littleParallelism;
-
- return this;
- }
-
- @Override
- protected String getAlgorithmName() {
- return TriangleListing.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
- Preconditions.checkNotNull(other);
-
- if (!TriangleListing.class.isAssignableFrom(other.getClass())) {
- return false;
- }
-
- TriangleListing rhs = (TriangleListing) other;
-
- sortTriangleVertices.mergeWith(rhs.sortTriangleVertices);
- littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
- ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
-
- return true;
- }
+extends TriangleListingBase<K, VV, EV, Result<K>> {
/*
* Implementation notes:
@@ -349,8 +291,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
*
* @param <T> ID type
*/
- @ForwardedFieldsFirst("0; 1; 2")
- @ForwardedFieldsSecond("0; 1")
+ @ForwardedFieldsFirst("0->vertexId0; 1->vertexId1; 2->vertexId2")
+ @ForwardedFieldsSecond("0->vertexId0; 1->vertexId1")
private static final class ProjectTriangles<T>
implements JoinFunction<Tuple4<T, T, T, ByteValue>, Tuple3<T, T, ByteValue>, Result<T>> {
private Result<T> output = new Result<>();
@@ -358,10 +300,10 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
@Override
public Result<T> join(Tuple4<T, T, T, ByteValue> triplet, Tuple3<T, T, ByteValue> edge)
throws Exception {
- output.f0 = triplet.f0;
- output.f1 = triplet.f1;
- output.f2 = triplet.f2;
- output.f3.setValue((byte) (triplet.f3.getValue() | edge.f2.getValue()));
+ output.setVertexId0(triplet.f0);
+ output.setVertexId1(triplet.f1);
+ output.setVertexId2(triplet.f2);
+ output.setBitmask((byte) (triplet.f3.getValue() | edge.f2.getValue()));
return output;
}
}
@@ -378,29 +320,29 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
public Result<T> map(Result<T> value)
throws Exception {
// by the triangle listing algorithm we know f1 < f2
- if (value.f0.compareTo(value.f1) > 0) {
- byte bitmask = value.f3.getValue();
+ if (value.getVertexId0().compareTo(value.getVertexId1()) > 0) {
+ byte bitmask = value.getBitmask().getValue();
- T tempVal = value.f0;
- value.f0 = value.f1;
+ T tempVal = value.getVertexId0();
+ value.setVertexId0(value.getVertexId1());
- if (tempVal.compareTo(value.f2) < 0) {
- value.f1 = tempVal;
+ if (tempVal.compareTo(value.getVertexId2()) < 0) {
+ value.setVertexId1(tempVal);
int f0f1 = ((bitmask & 0b100000) >>> 1) | ((bitmask & 0b010000) << 1);
int f0f2 = (bitmask & 0b001100) >>> 2;
int f1f2 = (bitmask & 0b000011) << 2;
- value.f3.setValue((byte) (f0f1 | f0f2 | f1f2));
+ value.setBitmask((byte) (f0f1 | f0f2 | f1f2));
} else {
- value.f1 = value.f2;
- value.f2 = tempVal;
+ value.setVertexId1(value.getVertexId2());
+ value.setVertexId2(tempVal);
int f0f1 = (bitmask & 0b000011) << 4;
int f0f2 = ((bitmask & 0b100000) >>> 3) | ((bitmask & 0b010000) >>> 1);
int f1f2 = ((bitmask & 0b001000) >>> 3) | ((bitmask & 0b000100) >>> 1);
- value.f3.setValue((byte) (f0f1 | f0f2 | f1f2));
+ value.setBitmask((byte) (f0f1 | f0f2 | f1f2));
}
}
@@ -409,49 +351,14 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
}
/**
- * Wraps {@link Tuple4} to encapsulate results from the directed Triangle Listing algorithm.
+ * A result for the directed Triangle Listing algorithm.
*
* @param <T> ID type
*/
public static class Result<T>
- extends Tuple4<T, T, T, ByteValue>
- implements PrintableResult, TertiaryResult<T> {
- /**
- * No-args constructor.
- */
- public Result() {
- f3 = new ByteValue();
- }
-
- @Override
- public T getVertexId0() {
- return f0;
- }
-
- @Override
- public void setVertexId0(T value) {
- f0 = value;
- }
-
- @Override
- public T getVertexId1() {
- return f1;
- }
-
- @Override
- public void setVertexId1(T value) {
- f1 = value;
- }
-
- @Override
- public T getVertexId2() {
- return f2;
- }
-
- @Override
- public void setVertexId2(T value) {
- f2 = value;
- }
+ extends TertiaryResultBase<T>
+ implements PrintableResult {
+ private ByteValue bitmask = new ByteValue();
/**
* Get the bitmask indicating the presence of the six potential
@@ -462,16 +369,37 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
* @see EdgeOrder
*/
public ByteValue getBitmask() {
- return f3;
+ return bitmask;
}
/**
- * Format values into a human-readable string.
+ * Set the bitmask indicating the presence of the six potential
+ * connecting edges.
*
- * @return verbose string
+ * @param bitmask the edge bitmask
+ *
+ * @see EdgeOrder
*/
+ public void setBitmask(ByteValue bitmask) {
+ this.bitmask = bitmask;
+ }
+
+ private void setBitmask(byte bitmask) {
+ this.bitmask.setValue(bitmask);
+ }
+
+ @Override
+ public String toString() {
+ return "(" + getVertexId0()
+ + "," + getVertexId1()
+ + "," + getVertexId2()
+ + "," + bitmask
+ + ")";
+ }
+
+ @Override
public String toPrintableString() {
- byte bitmask = f3.getValue();
+ byte bitmask = getBitmask().getValue();
return "1st vertex ID: " + getVertexId0()
+ ", 2nd vertex ID: " + getVertexId1()
@@ -492,8 +420,28 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
return "<->";
} else {
throw new IllegalArgumentException("Bitmask is missing an edge (mask = "
- + mask + ", shift = " + shift);
+ + mask + ", shift = " + shift + ")");
}
}
+
+ // ----------------------------------------------------------------------------------------
+
+ public static final int HASH_SEED = 0x0846ea21;
+
+ private transient MurmurHash hasher;
+
+ @Override
+ public int hashCode() {
+ if (hasher == null) {
+ hasher = new MurmurHash(HASH_SEED);
+ }
+
+ return hasher.reset()
+ .hash(getVertexId0().hashCode())
+ .hash(getVertexId1().hashCode())
+ .hash(getVertexId2().hashCode())
+ .hash(bitmask.getValue())
+ .hash();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index e94310f..3cb4b87 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -27,14 +27,14 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
import org.apache.flink.graph.asm.result.PrintableResult;
-import org.apache.flink.graph.asm.result.UnaryResult;
+import org.apache.flink.graph.asm.result.UnaryResultBase;
import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
import org.apache.flink.graph.utils.MurmurHash;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.CopyableValue;
@@ -99,12 +99,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
}
@Override
- protected String getAlgorithmName() {
- return LocalClusteringCoefficient.class.getName();
- }
-
- @Override
- protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+ protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);
if (!LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
@@ -186,13 +181,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
@Override
public void flatMap(TriangleListing.Result<T> value, Collector<Tuple2<T, LongValue>> out)
throws Exception {
- output.f0 = value.f0;
+ output.f0 = value.getVertexId0();
out.collect(output);
- output.f0 = value.f1;
+ output.f0 = value.getVertexId1();
out.collect(output);
- output.f0 = value.f2;
+ output.f0 = value.getVertexId2();
out.collect(output);
}
}
@@ -218,8 +213,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
*
* @param <T> ID type
*/
- @ForwardedFieldsFirst("0; 1")
- @ForwardedFieldsSecond("0")
+ @ForwardedFieldsFirst("0->vertexId0; 1->degree")
+ @ForwardedFieldsSecond("0->vertexId0")
private static class JoinVertexDegreeWithTriangleCount<T>
implements JoinFunction<Vertex<T, LongValue>, Tuple2<T, LongValue>, Result<T>> {
private LongValue zero = new LongValue(0);
@@ -229,35 +224,25 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
@Override
public Result<T> join(Vertex<T, LongValue> vertexAndDegree, Tuple2<T, LongValue> vertexAndTriangleCount)
throws Exception {
- output.f0 = vertexAndDegree.f0;
- output.f1 = vertexAndDegree.f1;
- output.f2 = (vertexAndTriangleCount == null) ? zero : vertexAndTriangleCount.f1;
+ output.setVertexId0(vertexAndDegree.f0);
+ output.setDegree(vertexAndDegree.f1);
+ output.setTriangleCount((vertexAndTriangleCount == null) ? zero : vertexAndTriangleCount.f1);
return output;
}
}
/**
- * Wraps {@link Tuple3} to encapsulate results from the Local Clustering Coefficient algorithm.
+ * A result for the undirected Local Clustering Coefficient algorithm.
*
* @param <T> ID type
*/
public static class Result<T>
- extends Tuple3<T, LongValue, LongValue>
- implements PrintableResult, UnaryResult<T> {
- private static final int HASH_SEED = 0xc23937c1;
-
- private MurmurHash hasher = new MurmurHash(HASH_SEED);
-
- @Override
- public T getVertexId0() {
- return f0;
- }
+ extends UnaryResultBase<T>
+ implements PrintableResult {
+ private LongValue degree;
- @Override
- public void setVertexId0(T value) {
- f0 = value;
- }
+ private LongValue triangleCount;
/**
* Get the vertex degree.
@@ -265,7 +250,16 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
* @return vertex degree
*/
public LongValue getDegree() {
- return f1;
+ return degree;
+ }
+
+ /**
+ * Set the vertex degree.
+ *
+ * @param degree vertex degree
+ */
+ public void setDegree(LongValue degree) {
+ this.degree = degree;
}
/**
@@ -275,7 +269,17 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
* @return triangle count
*/
public LongValue getTriangleCount() {
- return f2;
+ return triangleCount;
+ }
+
+ /**
+ * Set the number of triangles containing this vertex; equivalently,
+ * this is the number of edges between neighbors of this vertex.
+ *
+ * @param triangleCount triangle count
+ */
+ public void setTriangleCount(LongValue triangleCount) {
+ this.triangleCount = triangleCount;
}
/**
@@ -295,24 +299,43 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
return (neighborPairs == 0) ? Double.NaN : getTriangleCount().getValue() / (double) neighborPairs;
}
+ @Override
+ public String toString() {
+ return "(" + getVertexId0()
+ + "," + degree
+ + "," + triangleCount
+ + ")";
+ }
+
/**
* Format values into a human-readable string.
*
* @return verbose string
*/
+ @Override
public String toPrintableString() {
return "Vertex ID: " + getVertexId0()
- + ", vertex degree: " + getDegree()
- + ", triangle count: " + getTriangleCount()
+ + ", vertex degree: " + degree
+ + ", triangle count: " + triangleCount
+ ", local clustering coefficient: " + getLocalClusteringCoefficientScore();
}
+ // ----------------------------------------------------------------------------------------
+
+ public static final int HASH_SEED = 0xc23937c1;
+
+ private transient MurmurHash hasher;
+
@Override
public int hashCode() {
+ if (hasher == null) {
+ hasher = new MurmurHash(HASH_SEED);
+ }
+
return hasher.reset()
- .hash(f0.hashCode())
- .hash(f1.getValue())
- .hash(f2.getValue())
+ .hash(getVertexId0().hashCode())
+ .hash(degree.getValue())
+ .hash(triangleCount.getValue())
.hash();
}
}