You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/10/26 18:53:10 UTC
flink git commit: [hotfix] [gelly] Driver usage and configuration
Repository: flink
Updated Branches:
refs/heads/master fa1498616 -> baf057a48
[hotfix] [gelly] Driver usage and configuration
Fixes driver usages to print error messages.
Registers user command-line parameters for web UI configuration.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/baf057a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/baf057a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/baf057a4
Branch: refs/heads/master
Commit: baf057a4815ebee67f439a55074280fb9ac48aaf
Parents: fa14986
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Oct 26 12:06:43 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Oct 26 12:06:43 2016 -0400
----------------------------------------------------------------------
.../main/java/org/apache/flink/graph/Usage.java | 25 ++++++--
.../graph/drivers/ClusteringCoefficient.java | 15 +++--
.../apache/flink/graph/drivers/Graph500.java | 1 +
.../flink/graph/drivers/GraphMetrics.java | 4 ++
.../org/apache/flink/graph/drivers/HITS.java | 4 ++
.../flink/graph/drivers/JaccardIndex.java | 7 ++-
.../flink/graph/drivers/TriangleListing.java | 62 ++++++++++++++------
.../annotate/directed/VertexInDegree.java | 2 +-
.../annotate/directed/VertexOutDegree.java | 2 +-
.../annotate/undirected/VertexDegree.java | 2 +-
.../directed/LocalClusteringCoefficient.java | 2 +-
.../undirected/LocalClusteringCoefficient.java | 2 +-
.../flink/graph/library/link_analysis/HITS.java | 10 ++--
.../library/metric/directed/EdgeMetrics.java | 2 +-
.../library/metric/undirected/EdgeMetrics.java | 2 +-
15 files changed, 100 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
index 9d8f116..d923bf0 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
@@ -18,6 +18,9 @@
package org.apache.flink.graph;
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.flink.client.program.ProgramParametrizationException;
+
/**
* This default main class prints usage listing available classes.
*/
@@ -45,16 +48,26 @@ public class Usage {
org.apache.flink.graph.scala.examples.SingleSourceShortestPaths.class,
};
- public static void main(String[] args) throws Exception {
- System.out.println("Driver classes call algorithms from the Gelly library:");
+ private static String getUsage() {
+ StrBuilder strBuilder = new StrBuilder();
+
+ strBuilder.appendNewLine();
+ strBuilder.appendln("Driver classes call algorithms from the Gelly library:");
for (Class cls : DRIVERS) {
- System.out.println(" " + cls.getName());
+ strBuilder.append(" ").appendln(cls.getName());
}
- System.out.println("");
- System.out.println("Example classes illustrate Gelly APIs or alternative algorithms:");
+ strBuilder.appendNewLine();
+ strBuilder.appendln("Example classes illustrate Gelly APIs or alternative algorithms:");
for (Class cls : EXAMPLES) {
- System.out.println(" " + cls.getName());
+ strBuilder.append(" ").appendln(cls.getName());
}
+
+ return strBuilder.toString();
+ }
+
+ public static void main(String[] args) throws Exception {
+ // this exception is throw to prevent Flink from printing an error message
+ throw new ProgramParametrizationException(getUsage());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
index 18b0406..cd28ee4 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
@@ -87,6 +87,8 @@ public class ClusteringCoefficient {
.appendln(" --output print")
.appendln(" --output hash")
.appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+ .appendNewLine()
+ .appendln("Usage error: " + message)
.toString();
}
@@ -96,6 +98,7 @@ public class ClusteringCoefficient {
env.getConfig().enableObjectReuse();
ParameterTool parameters = ParameterTool.fromArgs(args);
+ env.getConfig().setGlobalJobParameters(parameters);
if (! parameters.has("directed")) {
throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
@@ -131,7 +134,8 @@ public class ClusteringCoefficient {
if (directedAlgorithm) {
if (parameters.getBoolean("simplify", false)) {
graph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
+ .setParallelism(little_parallelism));
}
gcc = graph
@@ -146,7 +150,8 @@ public class ClusteringCoefficient {
} else {
if (parameters.getBoolean("simplify", false)) {
graph = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
+ .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false)
+ .setParallelism(little_parallelism));
}
gcc = graph
@@ -168,7 +173,8 @@ public class ClusteringCoefficient {
if (directedAlgorithm) {
if (parameters.getBoolean("simplify", false)) {
graph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>()
+ .setParallelism(little_parallelism));
}
gcc = graph
@@ -183,7 +189,8 @@ public class ClusteringCoefficient {
} else {
if (parameters.getBoolean("simplify", false)) {
graph = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false));
+ .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false)
+ .setParallelism(little_parallelism));
}
gcc = graph
http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
index 8f9a54a..51ef66f 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
@@ -82,6 +82,7 @@ public class Graph500 {
env.getConfig().enableObjectReuse();
ParameterTool parameters = ParameterTool.fromArgs(args);
+ env.getConfig().setGlobalJobParameters(parameters);
// Generate RMat graph
int scale = parameters.getInt("scale", DEFAULT_SCALE);
http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
index 4fb11c3..899ae66 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
@@ -68,6 +68,8 @@ public class GraphMetrics {
.appendln("options:")
.appendln(" --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
.appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
+ .appendNewLine()
+ .appendln("Usage error: " + message)
.toString();
}
@@ -77,6 +79,8 @@ public class GraphMetrics {
env.getConfig().enableObjectReuse();
ParameterTool parameters = ParameterTool.fromArgs(args);
+ env.getConfig().setGlobalJobParameters(parameters);
+
if (! parameters.has("directed")) {
throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
index e0a233a..b035bd7 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
@@ -78,6 +78,8 @@ public class HITS {
.appendln(" --output print")
.appendln(" --output hash")
.appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+ .appendNewLine()
+ .appendln("Usage error: " + message)
.toString();
}
@@ -87,6 +89,8 @@ public class HITS {
env.getConfig().enableObjectReuse();
ParameterTool parameters = ParameterTool.fromArgs(args);
+ env.getConfig().setGlobalJobParameters(parameters);
+
int iterations = parameters.getInt("iterations", DEFAULT_ITERATIONS);
DataSet hits;
http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
index 5c173e0..cb11af9 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
@@ -95,6 +95,7 @@ public class JaccardIndex {
env.getConfig().enableObjectReuse();
ParameterTool parameters = ParameterTool.fromArgs(args);
+ env.getConfig().setGlobalJobParameters(parameters);
int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
@@ -121,7 +122,8 @@ public class JaccardIndex {
if (parameters.getBoolean("simplify", false)) {
graph = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
+ .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false)
+ .setParallelism(little_parallelism));
}
ji = graph
@@ -135,7 +137,8 @@ public class JaccardIndex {
if (parameters.getBoolean("simplify", false)) {
graph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>()
+ .setParallelism(little_parallelism));
}
ji = graph
http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
index 954f732..92f6a2c 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
@@ -44,6 +44,8 @@ import org.apache.flink.types.StringValue;
import java.text.NumberFormat;
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
/**
* Driver for the library implementation of Triangle Listing.
*
@@ -79,6 +81,8 @@ public class TriangleListing {
.appendln(" --output print")
.appendln(" --output hash")
.appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+ .appendNewLine()
+ .appendln("Usage error: " + message)
.toString();
}
@@ -88,11 +92,15 @@ public class TriangleListing {
env.getConfig().enableObjectReuse();
ParameterTool parameters = ParameterTool.fromArgs(args);
+ env.getConfig().setGlobalJobParameters(parameters);
+
if (! parameters.has("directed")) {
throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
}
boolean directedAlgorithm = parameters.getBoolean("directed");
+ int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
+
DataSet tl;
switch (parameters.get("input", "")) {
@@ -117,19 +125,23 @@ public class TriangleListing {
if (directedAlgorithm) {
if (parameters.getBoolean("simplify", false)) {
graph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
+ .setParallelism(little_parallelism));
}
tl = graph
- .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
} else {
if (parameters.getBoolean("simplify", false)) {
graph = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
+ .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false)
+ .setParallelism(little_parallelism));
}
tl = graph
- .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
}
} break;
@@ -140,19 +152,23 @@ public class TriangleListing {
if (directedAlgorithm) {
if (parameters.getBoolean("simplify", false)) {
graph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>()
+ .setParallelism(little_parallelism));
}
tl = graph
- .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
} else {
if (parameters.getBoolean("simplify", false)) {
graph = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false));
+ .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false)
+ .setParallelism(little_parallelism));
}
tl = graph
- .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
}
} break;
@@ -178,13 +194,18 @@ public class TriangleListing {
if (directedAlgorithm) {
if (scale > 32) {
tl = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>())
- .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
+ .setParallelism(little_parallelism))
+ .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
} else {
tl = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>())
- .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, NullValue, NullValue>());
+ .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
+ .setParallelism(little_parallelism))
+ .run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>()
+ .setParallelism(little_parallelism))
+ .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
}
} else {
boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
@@ -194,13 +215,18 @@ public class TriangleListing {
if (scale > 32) {
tl = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip))
- .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>());
+ .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
+ .setParallelism(little_parallelism))
+ .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
} else {
tl = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip))
- .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>());
+ .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
+ .setParallelism(little_parallelism))
+ .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
+ .setParallelism(little_parallelism))
+ .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
}
}
} break;
http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index 934c4ed..5fdd8f9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -121,7 +121,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
DataSet<Vertex<K, LongValue>> targetDegree = targetIds
.groupBy(0)
.reduce(new DegreeCount<K>())
- .setCombineHint(CombineHint.HASH)
+ .setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Degree count");
http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index a8745ca..8e3e9c6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -121,7 +121,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds
.groupBy(0)
.reduce(new DegreeCount<K>())
- .setCombineHint(CombineHint.HASH)
+ .setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Degree count");
http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index f466f85..b731548 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -145,7 +145,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
DataSet<Vertex<K, LongValue>> degree = vertexIds
.groupBy(0)
.reduce(new DegreeCount<K>())
- .setCombineHint(CombineHint.HASH)
+ .setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Degree count");
http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index 608500b..93fb678 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -151,7 +151,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices
.groupBy(0)
.reduce(new CountTriangles<K>())
- .setCombineHint(CombineHint.HASH)
+ .setCombineHint(CombineHint.HASH)
.name("Count triangles");
// u, deg(u)
http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 3621156..b22a0ce 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -151,7 +151,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices
.groupBy(0)
.reduce(new CountTriangles<K>())
- .setCombineHint(CombineHint.HASH)
+ .setCombineHint(CombineHint.HASH)
.name("Count triangles");
// u, deg(u)
http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
index 9e3511c..1be55f0 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
@@ -171,7 +171,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
.name("Initial scores")
.groupBy(0)
.reduce(new SumScores<K>())
- .setCombineHint(CombineHint.HASH)
+ .setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum");
@@ -188,7 +188,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
.name("Hub")
.groupBy(0)
.reduce(new SumScore<K>())
- .setCombineHint(CombineHint.HASH)
+ .setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum");
@@ -198,7 +198,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
.setParallelism(parallelism)
.name("Square")
.reduce(new Sum())
- .setCombineHint(CombineHint.HASH)
+ .setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum");
@@ -212,7 +212,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
.name("Authority")
.groupBy(0)
.reduce(new SumScore<K>())
- .setCombineHint(CombineHint.HASH)
+ .setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum");
@@ -222,7 +222,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
.setParallelism(parallelism)
.name("Square")
.reduce(new Sum())
- .setCombineHint(CombineHint.HASH)
+ .setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum");
http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
index b3e1e30..648fb76 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -117,7 +117,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
.name("Reduce edge stats")
.groupBy(0)
.reduce(new SumEdgeStats<K>())
- .setCombineHint(CombineHint.HASH)
+ .setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum edge stats");
http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
index 6bce42c..1c636ff 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -123,7 +123,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
.name("Edge stats")
.groupBy(0)
.reduce(new SumEdgeStats<K>())
- .setCombineHint(CombineHint.HASH)
+ .setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum edge stats");