You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/10/26 18:53:10 UTC

flink git commit: [hotfix] [gelly] Driver usage and configuration

Repository: flink
Updated Branches:
  refs/heads/master fa1498616 -> baf057a48


[hotfix] [gelly] Driver usage and configuration

Fixes driver usages to print error messages.

Registers user command-line parameters for web UI configuration.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/baf057a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/baf057a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/baf057a4

Branch: refs/heads/master
Commit: baf057a4815ebee67f439a55074280fb9ac48aaf
Parents: fa14986
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Oct 26 12:06:43 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Oct 26 12:06:43 2016 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Usage.java | 25 ++++++--
 .../graph/drivers/ClusteringCoefficient.java    | 15 +++--
 .../apache/flink/graph/drivers/Graph500.java    |  1 +
 .../flink/graph/drivers/GraphMetrics.java       |  4 ++
 .../org/apache/flink/graph/drivers/HITS.java    |  4 ++
 .../flink/graph/drivers/JaccardIndex.java       |  7 ++-
 .../flink/graph/drivers/TriangleListing.java    | 62 ++++++++++++++------
 .../annotate/directed/VertexInDegree.java       |  2 +-
 .../annotate/directed/VertexOutDegree.java      |  2 +-
 .../annotate/undirected/VertexDegree.java       |  2 +-
 .../directed/LocalClusteringCoefficient.java    |  2 +-
 .../undirected/LocalClusteringCoefficient.java  |  2 +-
 .../flink/graph/library/link_analysis/HITS.java | 10 ++--
 .../library/metric/directed/EdgeMetrics.java    |  2 +-
 .../library/metric/undirected/EdgeMetrics.java  |  2 +-
 15 files changed, 100 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
index 9d8f116..d923bf0 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.graph;
 
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.flink.client.program.ProgramParametrizationException;
+
 /**
  * This default main class prints usage listing available classes.
  */
@@ -45,16 +48,26 @@ public class Usage {
 		org.apache.flink.graph.scala.examples.SingleSourceShortestPaths.class,
 	};
 
-	public static void main(String[] args) throws Exception {
-		System.out.println("Driver classes call algorithms from the Gelly library:");
+	private static String getUsage() {
+		StrBuilder strBuilder = new StrBuilder();
+
+		strBuilder.appendNewLine();
+		strBuilder.appendln("Driver classes call algorithms from the Gelly library:");
 		for (Class cls : DRIVERS) {
-			System.out.println("  " + cls.getName());
+			strBuilder.append("  ").appendln(cls.getName());
 		}
 
-		System.out.println("");
-		System.out.println("Example classes illustrate Gelly APIs or alternative algorithms:");
+		strBuilder.appendNewLine();
+		strBuilder.appendln("Example classes illustrate Gelly APIs or alternative algorithms:");
 		for (Class cls : EXAMPLES) {
-			System.out.println("  " + cls.getName());
+			strBuilder.append("  ").appendln(cls.getName());
 		}
+
+		return strBuilder.toString();
+	}
+
+	public static void main(String[] args) throws Exception {
+		// this exception is throw to prevent Flink from printing an error message
+		throw new ProgramParametrizationException(getUsage());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
index 18b0406..cd28ee4 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
@@ -87,6 +87,8 @@ public class ClusteringCoefficient {
 			.appendln("  --output print")
 			.appendln("  --output hash")
 			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+			.appendNewLine()
+			.appendln("Usage error: " + message)
 			.toString();
 	}
 
@@ -96,6 +98,7 @@ public class ClusteringCoefficient {
 		env.getConfig().enableObjectReuse();
 
 		ParameterTool parameters = ParameterTool.fromArgs(args);
+		env.getConfig().setGlobalJobParameters(parameters);
 
 		if (! parameters.has("directed")) {
 			throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
@@ -131,7 +134,8 @@ public class ClusteringCoefficient {
 						if (directedAlgorithm) {
 							if (parameters.getBoolean("simplify", false)) {
 								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
+										.setParallelism(little_parallelism));
 							}
 
 							gcc = graph
@@ -146,7 +150,8 @@ public class ClusteringCoefficient {
 						} else {
 							if (parameters.getBoolean("simplify", false)) {
 								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
+									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false)
+										.setParallelism(little_parallelism));
 							}
 
 							gcc = graph
@@ -168,7 +173,8 @@ public class ClusteringCoefficient {
 						if (directedAlgorithm) {
 							if (parameters.getBoolean("simplify", false)) {
 								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
+									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>()
+										.setParallelism(little_parallelism));
 							}
 
 							gcc = graph
@@ -183,7 +189,8 @@ public class ClusteringCoefficient {
 						} else {
 							if (parameters.getBoolean("simplify", false)) {
 								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false));
+									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false)
+										.setParallelism(little_parallelism));
 							}
 
 							gcc = graph

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
index 8f9a54a..51ef66f 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
@@ -82,6 +82,7 @@ public class Graph500 {
 		env.getConfig().enableObjectReuse();
 
 		ParameterTool parameters = ParameterTool.fromArgs(args);
+		env.getConfig().setGlobalJobParameters(parameters);
 
 		// Generate RMat graph
 		int scale = parameters.getInt("scale", DEFAULT_SCALE);

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
index 4fb11c3..899ae66 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
@@ -68,6 +68,8 @@ public class GraphMetrics {
 			.appendln("options:")
 			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
 			.appendln("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
+			.appendNewLine()
+			.appendln("Usage error: " + message)
 			.toString();
 	}
 
@@ -77,6 +79,8 @@ public class GraphMetrics {
 		env.getConfig().enableObjectReuse();
 
 		ParameterTool parameters = ParameterTool.fromArgs(args);
+		env.getConfig().setGlobalJobParameters(parameters);
+
 		if (! parameters.has("directed")) {
 			throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
index e0a233a..b035bd7 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
@@ -78,6 +78,8 @@ public class HITS {
 			.appendln("  --output print")
 			.appendln("  --output hash")
 			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+			.appendNewLine()
+			.appendln("Usage error: " + message)
 			.toString();
 	}
 
@@ -87,6 +89,8 @@ public class HITS {
 		env.getConfig().enableObjectReuse();
 
 		ParameterTool parameters = ParameterTool.fromArgs(args);
+		env.getConfig().setGlobalJobParameters(parameters);
+
 		int iterations = parameters.getInt("iterations", DEFAULT_ITERATIONS);
 
 		DataSet hits;

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
index 5c173e0..cb11af9 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
@@ -95,6 +95,7 @@ public class JaccardIndex {
 		env.getConfig().enableObjectReuse();
 
 		ParameterTool parameters = ParameterTool.fromArgs(args);
+		env.getConfig().setGlobalJobParameters(parameters);
 
 		int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
 
@@ -121,7 +122,8 @@ public class JaccardIndex {
 
 						if (parameters.getBoolean("simplify", false)) {
 							graph = graph
-								.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
+								.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false)
+									.setParallelism(little_parallelism));
 						}
 
 						ji = graph
@@ -135,7 +137,8 @@ public class JaccardIndex {
 
 						if (parameters.getBoolean("simplify", false)) {
 							graph = graph
-								.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
+								.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>()
+									.setParallelism(little_parallelism));
 						}
 
 						ji = graph

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
index 954f732..92f6a2c 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
@@ -44,6 +44,8 @@ import org.apache.flink.types.StringValue;
 
 import java.text.NumberFormat;
 
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
 /**
  * Driver for the library implementation of Triangle Listing.
  *
@@ -79,6 +81,8 @@ public class TriangleListing {
 			.appendln("  --output print")
 			.appendln("  --output hash")
 			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+			.appendNewLine()
+			.appendln("Usage error: " + message)
 			.toString();
 	}
 
@@ -88,11 +92,15 @@ public class TriangleListing {
 		env.getConfig().enableObjectReuse();
 
 		ParameterTool parameters = ParameterTool.fromArgs(args);
+		env.getConfig().setGlobalJobParameters(parameters);
+
 		if (! parameters.has("directed")) {
 			throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
 		}
 		boolean directedAlgorithm = parameters.getBoolean("directed");
 
+		int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
+
 		DataSet tl;
 
 		switch (parameters.get("input", "")) {
@@ -117,19 +125,23 @@ public class TriangleListing {
 						if (directedAlgorithm) {
 							if (parameters.getBoolean("simplify", false)) {
 								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
+										.setParallelism(little_parallelism));
 							}
 
 							tl = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>());
+								.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
 						} else {
 							if (parameters.getBoolean("simplify", false)) {
 								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
+									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false)
+										.setParallelism(little_parallelism));
 							}
 
 							tl = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>());
+								.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
 						}
 					} break;
 
@@ -140,19 +152,23 @@ public class TriangleListing {
 						if (directedAlgorithm) {
 							if (parameters.getBoolean("simplify", false)) {
 								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
+									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>()
+										.setParallelism(little_parallelism));
 							}
 
 							tl = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, NullValue, NullValue>());
+								.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
 						} else {
 							if (parameters.getBoolean("simplify", false)) {
 								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false));
+									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false)
+										.setParallelism(little_parallelism));
 							}
 
 							tl = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue, NullValue, NullValue>());
+								.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
 						}
 					} break;
 
@@ -178,13 +194,18 @@ public class TriangleListing {
 				if (directedAlgorithm) {
 					if (scale > 32) {
 						tl = graph
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>())
-							.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>());
+							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
+								.setParallelism(little_parallelism))
+							.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
 					} else {
 						tl = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>())
-							.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, NullValue, NullValue>());
+							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
+								.setParallelism(little_parallelism))
+							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>()
+								.setParallelism(little_parallelism))
+							.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
 					}
 				} else {
 					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
@@ -194,13 +215,18 @@ public class TriangleListing {
 
 					if (scale > 32) {
 						tl = graph
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip))
-							.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>());
+							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
+								.setParallelism(little_parallelism))
+							.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
 					} else {
 						tl = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip))
-							.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>());
+							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
+								.setParallelism(little_parallelism))
+							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
+								.setParallelism(little_parallelism))
+							.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
 					}
 				}
 			} break;

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index 934c4ed..5fdd8f9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -121,7 +121,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 		DataSet<Vertex<K, LongValue>> targetDegree = targetIds
 			.groupBy(0)
 			.reduce(new DegreeCount<K>())
-				.setCombineHint(CombineHint.HASH)
+			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Degree count");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index a8745ca..8e3e9c6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -121,7 +121,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 		DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds
 			.groupBy(0)
 			.reduce(new DegreeCount<K>())
-				.setCombineHint(CombineHint.HASH)
+			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Degree count");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index f466f85..b731548 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -145,7 +145,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 		DataSet<Vertex<K, LongValue>> degree = vertexIds
 			.groupBy(0)
 			.reduce(new DegreeCount<K>())
-				.setCombineHint(CombineHint.HASH)
+			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Degree count");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index 608500b..93fb678 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -151,7 +151,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices
 			.groupBy(0)
 			.reduce(new CountTriangles<K>())
-				.setCombineHint(CombineHint.HASH)
+			.setCombineHint(CombineHint.HASH)
 				.name("Count triangles");
 
 		// u, deg(u)

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 3621156..b22a0ce 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -151,7 +151,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices
 			.groupBy(0)
 			.reduce(new CountTriangles<K>())
-				.setCombineHint(CombineHint.HASH)
+			.setCombineHint(CombineHint.HASH)
 				.name("Count triangles");
 
 		// u, deg(u)

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
index 9e3511c..1be55f0 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
@@ -171,7 +171,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				.name("Initial scores")
 			.groupBy(0)
 			.reduce(new SumScores<K>())
-				.setCombineHint(CombineHint.HASH)
+			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum");
 
@@ -188,7 +188,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				.name("Hub")
 			.groupBy(0)
 			.reduce(new SumScore<K>())
-				.setCombineHint(CombineHint.HASH)
+			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum");
 
@@ -198,7 +198,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				.setParallelism(parallelism)
 				.name("Square")
 			.reduce(new Sum())
-				.setCombineHint(CombineHint.HASH)
+			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum");
 
@@ -212,7 +212,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				.name("Authority")
 			.groupBy(0)
 			.reduce(new SumScore<K>())
-				.setCombineHint(CombineHint.HASH)
+			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum");
 
@@ -222,7 +222,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				.setParallelism(parallelism)
 				.name("Square")
 			.reduce(new Sum())
-				.setCombineHint(CombineHint.HASH)
+			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
index b3e1e30..648fb76 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -117,7 +117,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 				.name("Reduce edge stats")
 			.groupBy(0)
 			.reduce(new SumEdgeStats<K>())
-				.setCombineHint(CombineHint.HASH)
+			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum edge stats");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/baf057a4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
index 6bce42c..1c636ff 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -123,7 +123,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 				.name("Edge stats")
 			.groupBy(0)
 			.reduce(new SumEdgeStats<K>())
-				.setCombineHint(CombineHint.HASH)
+			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Sum edge stats");