You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/25 11:04:13 UTC

[4/7] flink git commit: [FLINK-6707] [examples] Activate strict checkstyle for flink-examples

[FLINK-6707] [examples] Activate strict checkstyle for flink-examples


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/789ed8a8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/789ed8a8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/789ed8a8

Branch: refs/heads/master
Commit: 789ed8a8246d140e1621a5860645a747132d6618
Parents: d481f29
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed May 24 13:24:02 2017 -0400
Committer: zentol <ch...@apache.org>
Committed: Thu May 25 09:48:30 2017 +0200

----------------------------------------------------------------------
 .../flink/examples/java/clustering/KMeans.java  |  31 +-
 .../java/clustering/util/KMeansData.java        |   4 +-
 .../clustering/util/KMeansDataGenerator.java    |  68 +-
 .../flink/examples/java/distcp/DistCp.java      |  12 +-
 .../examples/java/distcp/FileCopyTask.java      |   9 +-
 .../java/distcp/FileCopyTaskInputFormat.java    |   6 +-
 .../java/distcp/FileCopyTaskInputSplit.java     |   6 +-
 .../java/graph/ConnectedComponents.java         |  74 +-
 .../examples/java/graph/EnumTriangles.java      |  70 +-
 .../flink/examples/java/graph/PageRank.java     |  84 +--
 .../java/graph/TransitiveClosureNaive.java      |  22 +-
 .../graph/util/ConnectedComponentsData.java     |  18 +-
 .../java/graph/util/EnumTrianglesData.java      |  19 +-
 .../java/graph/util/EnumTrianglesDataTypes.java | 128 ++--
 .../examples/java/graph/util/PageRankData.java  |  22 +-
 .../java/misc/CollectionExecutionExample.java   |  50 +-
 .../flink/examples/java/misc/PiEstimation.java  |  35 +-
 .../examples/java/ml/LinearRegression.java      |  80 +--
 .../java/ml/util/LinearRegressionData.java      |   2 +-
 .../ml/util/LinearRegressionDataGenerator.java  |  21 +-
 .../relational/EmptyFieldsCountAccumulator.java |  28 +-
 .../examples/java/relational/TPCHQuery10.java   | 107 ++-
 .../examples/java/relational/TPCHQuery3.java    | 183 +++--
 .../java/relational/WebLogAnalysis.java         |  72 +-
 .../java/relational/util/WebLogData.java        | 709 +++++++++----------
 .../relational/util/WebLogDataGenerator.java    |  27 +-
 .../examples/java/wordcount/WordCount.java      |  33 +-
 .../examples/java/wordcount/WordCountPojo.java  |  38 +-
 .../examples/scala/graph/EnumTriangles.scala    |  11 +-
 .../examples/scala/graph/PageRankBasic.scala    |   3 +-
 .../examples/scala/relational/TPCHQuery10.scala |   3 +-
 .../examples/scala/relational/TPCHQuery3.scala  |   3 +-
 .../examples/async/AsyncIOExample.java          |  13 +-
 .../examples/iteration/IterateExample.java      |  14 +-
 .../iteration/util/IterateExampleData.java      |   3 +
 .../streaming/examples/join/WindowJoin.java     |  17 +-
 .../streaming/examples/kafka/ReadFromKafka.java |   5 +-
 .../examples/kafka/WriteIntoKafka.java          |  12 +-
 .../ml/IncrementalLearningSkeleton.java         |   8 +-
 .../util/IncrementalLearningSkeletonData.java   |   3 +
 .../examples/sideoutput/SideOutputExample.java  |  16 +-
 .../examples/socket/SocketWindowWordCount.java  |   4 +-
 .../examples/twitter/TwitterExample.java        |  31 +-
 .../twitter/util/TwitterExampleData.java        |   4 +-
 .../examples/utils/ThrottledIterator.java       |   4 +-
 .../GroupedProcessingTimeWindowExample.java     |  40 +-
 .../examples/windowing/SessionWindowing.java    |   6 +-
 .../examples/windowing/TopSpeedWindowing.java   |   2 +-
 .../examples/windowing/WindowWordCount.java     |  12 +-
 .../windowing/util/SessionWindowingData.java    |   3 +
 .../util/TopSpeedWindowingExampleData.java      |   3 +
 .../examples/wordcount/PojoExample.java         |  14 +-
 .../streaming/examples/wordcount/WordCount.java |  16 +-
 .../scala/examples/join/WindowJoin.scala        |   2 +-
 .../iteration/IterateExampleITCase.java         |   4 +-
 .../join/WindowJoinData.java                    |   2 +-
 .../join/WindowJoinITCase.java                  |  19 +-
 .../ml/IncrementalLearningSkeletonITCase.java   |   3 +
 .../twitter/TwitterStreamITCase.java            |   3 +
 .../windowing/SessionWindowingITCase.java       |   3 +
 .../TopSpeedWindowingExampleITCase.java         |   5 +-
 .../windowing/WindowWordCountITCase.java        |   3 +
 .../wordcount/PojoExampleITCase.java            |   3 +
 .../wordcount/WordCountITCase.java              |   3 +
 .../TopSpeedWindowingExampleITCase.java         |   5 +-
 .../socket/SocketWindowWordCountITCase.java     |  42 +-
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../scala/examples/WindowJoinITCase.scala       |   6 +-
 .../flink/table/examples/java/WordCountSQL.java |  10 +-
 .../table/examples/java/WordCountTable.java     |  21 +-
 flink-examples/pom.xml                          |  50 +-
 71 files changed, 1251 insertions(+), 1143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
index 8e51df8..101eda3 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
@@ -18,27 +18,26 @@
 
 package org.apache.flink.examples.java.clustering;
 
-import java.io.Serializable;
-import java.util.Collection;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.examples.java.clustering.util.KMeansData;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
+
+import java.io.Serializable;
+import java.util.Collection;
 
 /**
  * This example implements a basic K-Means clustering algorithm.
  *
- * <p>
- * K-Means is an iterative clustering algorithm and works as follows:<br>
+ * <p>K-Means is an iterative clustering algorithm and works as follows:<br>
  * K-Means is given a set of data points to be clustered and an initial set of <i>K</i> cluster centers.
  * In each iteration, the algorithm computes the distance of each data point to each cluster center.
  * Each point is assigned to the cluster center which is closest to it.
@@ -48,13 +47,11 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
  * or if cluster centers do not (significantly) move in an iteration.<br>
  * This is the Wikipedia entry for the <a href="http://en.wikipedia.org/wiki/K-means_clustering">K-Means Clustering algorithm</a>.
  *
- * <p>
- * This implementation works on two-dimensional data points. <br>
+ * <p>This implementation works on two-dimensional data points. <br>
  * It computes an assignment of data points to cluster centers, i.e.,
  * each data point is annotated with the id of the final cluster (center) it belongs to.
  *
- * <p>
- * Input files are plain text files and must be formatted as follows:
+ * <p>Input files are plain text files and must be formatted as follows:
  * <ul>
  * <li>Data points are represented as two double values separated by a blank character.
  * Data points are separated by newline characters.<br>
@@ -63,12 +60,10 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
  * For example <code>"1 6.2 3.2\n2 2.9 5.7\n"</code> gives two centers (id=1, x=6.2, y=3.2) and (id=2, x=2.9, y=5.7).
  * </ul>
  *
- * <p>
- * Usage: <code>KMeans --points &lt;path&gt; --centroids &lt;path&gt; --output &lt;path&gt; --iterations &lt;n&gt;</code><br>
+ * <p>Usage: <code>KMeans --points &lt;path&gt; --centroids &lt;path&gt; --output &lt;path&gt; --iterations &lt;n&gt;</code><br>
  * If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.clustering.util.KMeansData} and 10 iterations.
  *
- * <p>
- * This example shows how to use:
+ * <p>This example shows how to use:
  * <ul>
  * <li>Bulk iterations
  * <li>Broadcast variables in bulk iterations
@@ -187,7 +182,7 @@ public class KMeans {
 		}
 
 		public double euclideanDistance(Point other) {
-			return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y));
+			return Math.sqrt((x - other.x) * (x - other.x) + (y - other.y) * (y - other.y));
 		}
 
 		public void clear() {
@@ -210,7 +205,7 @@ public class KMeans {
 		public Centroid() {}
 
 		public Centroid(int id, double x, double y) {
-			super(x,y);
+			super(x, y);
 			this.id = id;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java
index e165612..24c30a8 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java
@@ -92,7 +92,7 @@ public class KMeansData {
 		}
 		return env.fromCollection(centroidList);
 	}
-	
+
 	public static DataSet<Point> getDefaultPointDataSet(ExecutionEnvironment env) {
 		List<Point> pointList = new LinkedList<Point>();
 		for (Object[] point : POINTS) {
@@ -100,5 +100,5 @@ public class KMeansData {
 		}
 		return env.fromCollection(pointList);
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
index 8f48d0a..9f7c98d 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
@@ -16,9 +16,11 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.examples.java.clustering.util;
 
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.examples.java.clustering.KMeans;
+
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
@@ -27,18 +29,15 @@ import java.text.DecimalFormat;
 import java.util.Locale;
 import java.util.Random;
 
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.examples.java.clustering.KMeans;
-
 /**
  * Generates data for the {@link KMeans} example program.
  */
 public class KMeansDataGenerator {
-	
+
 	static {
 		Locale.setDefault(Locale.US);
 	}
-	
+
 	private static final String CENTERS_FILE = "centers";
 	private static final String POINTS_FILE = "points";
 	private static final long DEFAULT_SEED = 4650285087650871364L;
@@ -50,14 +49,14 @@ public class KMeansDataGenerator {
 
 	/**
 	 * Main method to generate data for the {@link KMeans} example program.
-	 * <p>
-	 * The generator creates to files:
+	 *
+	 * <p>The generator creates to files:
 	 * <ul>
 	 * <li><code>&lt; output-path &gt;/points</code> for the data points
 	 * <li><code>&lt; output-path &gt;/centers</code> for the cluster centers
-	 * </ul> 
-	 * 
-	 * @param args 
+	 * </ul>
+	 *
+	 * @param args
 	 * <ol>
 	 * <li>Int: Number of data points
 	 * <li>Int: Number of cluster centers
@@ -87,22 +86,21 @@ public class KMeansDataGenerator {
 		final double range = params.getDouble("range", DEFAULT_VALUE_RANGE);
 		final long firstSeed = params.getLong("seed", DEFAULT_SEED);
 
-		
 		final double absoluteStdDev = stddev * range;
 		final Random random = new Random(firstSeed);
-		
+
 		// the means around which data points are distributed
 		final double[][] means = uniformRandomCenters(random, k, DIMENSIONALITY, range);
-		
+
 		// write the points out
 		BufferedWriter pointsOut = null;
 		try {
-			pointsOut = new BufferedWriter(new FileWriter(new File(outDir+"/"+POINTS_FILE)));
+			pointsOut = new BufferedWriter(new FileWriter(new File(outDir + "/" + POINTS_FILE)));
 			StringBuilder buffer = new StringBuilder();
-			
+
 			double[] point = new double[DIMENSIONALITY];
 			int nextCentroid = 0;
-			
+
 			for (int i = 1; i <= numDataPoints; i++) {
 				// generate a point for the current centroid
 				double[] centroid = means[nextCentroid];
@@ -118,15 +116,15 @@ public class KMeansDataGenerator {
 				pointsOut.close();
 			}
 		}
-		
+
 		// write the uniformly distributed centers to a file
 		BufferedWriter centersOut = null;
 		try {
-			centersOut = new BufferedWriter(new FileWriter(new File(outDir+"/"+CENTERS_FILE)));
+			centersOut = new BufferedWriter(new FileWriter(new File(outDir + "/" + CENTERS_FILE)));
 			StringBuilder buffer = new StringBuilder();
-			
+
 			double[][] centers = uniformRandomCenters(random, k, DIMENSIONALITY, range);
-			
+
 			for (int i = 0; i < k; i++) {
 				writeCenter(i + 1, centers[i], buffer, centersOut);
 			}
@@ -136,41 +134,41 @@ public class KMeansDataGenerator {
 				centersOut.close();
 			}
 		}
-		
-		System.out.println("Wrote "+numDataPoints+" data points to "+outDir+"/"+POINTS_FILE);
-		System.out.println("Wrote "+k+" cluster centers to "+outDir+"/"+CENTERS_FILE);
+
+		System.out.println("Wrote " + numDataPoints + " data points to " + outDir + "/" + POINTS_FILE);
+		System.out.println("Wrote " + k + " cluster centers to " + outDir + "/" + CENTERS_FILE);
 	}
-	
+
 	private static double[][] uniformRandomCenters(Random rnd, int num, int dimensionality, double range) {
 		final double halfRange = range / 2;
 		final double[][] points = new double[num][dimensionality];
-		
+
 		for (int i = 0; i < num; i++) {
-			for (int dim = 0; dim < dimensionality; dim ++) {
+			for (int dim = 0; dim < dimensionality; dim++) {
 				points[i][dim] = (rnd.nextDouble() * range) - halfRange;
 			}
 		}
 		return points;
 	}
-	
+
 	private static void writePoint(double[] coordinates, StringBuilder buffer, BufferedWriter out) throws IOException {
 		buffer.setLength(0);
-		
+
 		// write coordinates
 		for (int j = 0; j < coordinates.length; j++) {
 			buffer.append(FORMAT.format(coordinates[j]));
-			if(j < coordinates.length - 1) {
+			if (j < coordinates.length - 1) {
 				buffer.append(DELIMITER);
 			}
 		}
-		
+
 		out.write(buffer.toString());
 		out.newLine();
 	}
-	
+
 	private static void writeCenter(long id, double[] coordinates, StringBuilder buffer, BufferedWriter out) throws IOException {
 		buffer.setLength(0);
-		
+
 		// write id
 		buffer.append(id);
 		buffer.append(DELIMITER);
@@ -178,11 +176,11 @@ public class KMeansDataGenerator {
 		// write coordinates
 		for (int j = 0; j < coordinates.length; j++) {
 			buffer.append(FORMAT.format(coordinates[j]));
-			if(j < coordinates.length - 1) {
+			if (j < coordinates.length - 1) {
 				buffer.append(DELIMITER);
 			}
 		}
-		
+
 		out.write(buffer.toString());
 		out.newLine();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
index 82f1c52..a358490 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.examples.java.distcp;
 
-import org.apache.commons.io.IOUtils;
-
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.DataSet;
@@ -37,6 +35,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Collector;
 
+import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,12 +51,12 @@ import java.util.Map;
  * (see <a href="http://hadoop.apache.org/docs/r1.2.1/distcp.html">http://hadoop.apache.org/docs/r1.2.1/distcp.html</a>)
  * with a dynamic input format
  * Note that this tool does not deal with retriability. Additionally, empty directories are not copied over.
- * <p>
- * When running locally, local file systems paths can be used.
+ *
+ * <p>When running locally, local file systems paths can be used.
  * However, in a distributed environment HDFS paths must be provided both as input and output.
  */
 public class DistCp {
-	
+
 	private static final Logger LOGGER = LoggerFactory.getLogger(DistCp.class);
 	public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED";
 	public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED";
@@ -100,7 +99,6 @@ public class DistCp {
 				new FileCopyTaskInputFormat(tasks),
 				new GenericTypeInfo<>(FileCopyTask.class), "fileCopyTasks");
 
-
 		FlatMapOperator<FileCopyTask, Object> res = inputTasks.flatMap(new RichFlatMapFunction<FileCopyTask, Object>() {
 
 			private static final long serialVersionUID = 1109254230243989929L;
@@ -139,7 +137,7 @@ public class DistCp {
 					IOUtils.closeQuietly(inputStream);
 					IOUtils.closeQuietly(outputStream);
 				}
-				fileCounter.add(1l);
+				fileCounter.add(1L);
 			}
 		});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
index 7f38a8b..4a8f38b 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
@@ -18,18 +18,19 @@
 
 package org.apache.flink.examples.java.distcp;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.core.fs.Path;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.io.Serializable;
 
 /**
- * A Java POJO that represents a task for copying a single file
+ * A Java POJO that represents a task for copying a single file.
  */
 public class FileCopyTask implements Serializable {
-	
+
 	private static final long serialVersionUID = -8760082278978316032L;
-	
+
 	private final Path path;
 	private final String relativePath;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
index d6e6713..dfd9bf0 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
@@ -35,14 +35,14 @@ import java.util.Queue;
 
 /**
  * An implementation of an input format that dynamically assigns {@code FileCopyTask} to the mappers
- * that have finished previously assigned tasks
+ * that have finished previously assigned tasks.
  */
 public class FileCopyTaskInputFormat implements InputFormat<FileCopyTask, FileCopyTaskInputSplit> {
 
 	private static final long serialVersionUID = -644394866425221151L;
-	
+
 	private static final Logger LOGGER = LoggerFactory.getLogger(FileCopyTaskInputFormat.class);
-	
+
 
 	private final List<FileCopyTask> tasks;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
index 33943b6..b7ec0c9 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
@@ -21,12 +21,12 @@ package org.apache.flink.examples.java.distcp;
 import org.apache.flink.core.io.InputSplit;
 
 /**
- * Implementation of {@code InputSplit} for copying files
+ * Implementation of {@code InputSplit} for copying files.
  */
 public class FileCopyTaskInputSplit implements InputSplit {
-	
+
 	private static final long serialVersionUID = -7621656017747660450L;
-	
+
 	private final FileCopyTask task;
 	private final int splitNumber;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
index 3bd6522..9568b31 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
@@ -16,69 +16,63 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.examples.java.graph;
 
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
 import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
 
 /**
  * An implementation of the connected components algorithm, using a delta iteration.
- * 
- * <p>
- * Initially, the algorithm assigns each vertex an unique ID. In each step, a vertex picks the minimum of its own ID and its
+ *
+ * <p>Initially, the algorithm assigns each vertex an unique ID. In each step, a vertex picks the minimum of its own ID and its
  * neighbors' IDs, as its new ID and tells its neighbors about its new ID. After the algorithm has completed, all vertices in the
  * same component will have the same ID.
- * 
- * <p>
- * A vertex whose component ID did not change needs not propagate its information in the next step. Because of that,
+ *
+ * <p>A vertex whose component ID did not change needs not propagate its information in the next step. Because of that,
  * the algorithm is easily expressible via a delta iteration. We here model the solution set as the vertices with
  * their current component ids, and the workset as the changed vertices. Because we see all vertices initially as
  * changed, the initial workset and the initial solution set are identical. Also, the delta to the solution set
  * is consequently also the next workset.<br>
- * 
- * <p>
- * Input files are plain text files and must be formatted as follows:
+ *
+ * <p>Input files are plain text files and must be formatted as follows:
  * <ul>
- * <li>Vertices represented as IDs and separated by new-line characters.<br> 
+ * <li>Vertices represented as IDs and separated by new-line characters.<br>
  * For example <code>"1\n2\n12\n42\n63"</code> gives five vertices (1), (2), (12), (42), and (63).
- * <li>Edges are represented as pairs for vertex IDs which are separated by space 
+ * <li>Edges are represented as pairs for vertex IDs which are separated by space
  * characters. Edges are separated by new-line characters.<br>
  * For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63).
  * </ul>
- * 
- * <p>
- * Usage: <code>ConnectedComponents --vertices &lt;path&gt; --edges &lt;path&gt; --output &lt;path&gt; --iterations &lt;n&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.graph.util.ConnectedComponentsData} and 10 iterations. 
- * 
- * <p>
- * This example shows how to use:
+ *
+ * <p>Usage: <code>ConnectedComponents --vertices &lt;path&gt; --edges &lt;path&gt; --output &lt;path&gt; --iterations &lt;n&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.graph.util.ConnectedComponentsData} and 10 iterations.
+ *
+ * <p>This example shows how to use:
  * <ul>
  * <li>Delta Iterations
- * <li>Generic-typed Functions 
+ * <li>Generic-typed Functions
  * </ul>
  */
 @SuppressWarnings("serial")
 public class ConnectedComponents {
-	
+
 	// *************************************************************************
 	//     PROGRAM
 	// *************************************************************************
-	
+
 	public static void main(String... args) throws Exception {
 
 		// Checking input parameters
@@ -91,19 +85,19 @@ public class ConnectedComponents {
 
 		// make parameters available in the web interface
 		env.getConfig().setGlobalJobParameters(params);
-		
+
 		// read vertex and edge data
 		DataSet<Long> vertices = getVertexDataSet(env, params);
 		DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env, params).flatMap(new UndirectEdge());
-		
+
 		// assign the initial components (equal to the vertex id)
 		DataSet<Tuple2<Long, Long>> verticesWithInitialId =
 			vertices.map(new DuplicateValue<Long>());
-				
+
 		// open a delta iteration
 		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
 				verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);
-		
+
 		// apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller
 		DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset().join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
 				.groupBy(0).aggregate(Aggregations.MIN, 1)
@@ -112,7 +106,7 @@ public class ConnectedComponents {
 
 		// close the delta iteration (delta and new workset are identical)
 		DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
-		
+
 		// emit result
 		if (params.has("output")) {
 			result.writeAsCsv(params.get("output"), "\n", " ");
@@ -123,29 +117,29 @@ public class ConnectedComponents {
 			result.print();
 		}
 	}
-	
+
 	// *************************************************************************
 	//     USER FUNCTIONS
 	// *************************************************************************
-	
+
 	/**
 	 * Function that turns a value into a 2-tuple where both fields are that value.
 	 */
 	@ForwardedFields("*->f0")
 	public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
-		
+
 		@Override
 		public Tuple2<T, T> map(T vertex) {
 			return new Tuple2<T, T>(vertex, vertex);
 		}
 	}
-	
+
 	/**
 	 * Undirected edges by emitting for each input edge the input edges itself and an inverted version.
 	 */
 	public static final class UndirectEdge implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 		Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
-		
+
 		@Override
 		public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
 			invertedEdge.f0 = edge.f1;
@@ -154,7 +148,7 @@ public class ConnectedComponents {
 			out.collect(invertedEdge);
 		}
 	}
-	
+
 	/**
 	 * UDF that joins a (Vertex-ID, Component-ID) pair that represents the current component that
 	 * a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function
@@ -169,9 +163,11 @@ public class ConnectedComponents {
 			return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
 		}
 	}
-	
-
 
+	/**
+	 * Emit the candidate (Vertex-ID, Component-ID) pair if and only if the
+	 * candidate component ID is less than the vertex's current component ID.
+	 */
 	@ForwardedFieldsFirst("*")
 	public static final class ComponentIdFilter implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
@@ -211,6 +207,4 @@ public class ConnectedComponents {
 			return ConnectedComponentsData.getDefaultEdgeDataSet(env);
 		}
 	}
-	
-	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTriangles.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTriangles.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTriangles.java
index 5fbb321..2c553e4 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTriangles.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTriangles.java
@@ -39,17 +39,15 @@ import java.util.List;
 /**
  * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
  * A triangle consists of three edges that connect three vertices with each other.
- * 
- * <p>
- * The algorithm works as follows: 
- * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices 
- * that are connected by two edges. Finally, all triads are filtered for which no third edge exists 
+ *
+ * <p>The algorithm works as follows:
+ * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices
+ * that are connected by two edges. Finally, all triads are filtered for which no third edge exists
  * that closes the triangle.
- *  
- * <p>
- * Input files are plain text files and must be formatted as follows:
+ *
+ * <p>Input files are plain text files and must be formatted as follows:
  * <ul>
- * <li>Edges are represented as pairs for vertex IDs which are separated by space 
+ * <li>Edges are represented as pairs for vertex IDs which are separated by space
  * characters. Edges are separated by new-line characters.<br>
  * For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
  * that include a triangle
@@ -59,17 +57,15 @@ import java.util.List;
  *     /  \
  *   (2)-(12)
  * </pre>
- * 
- * Usage: <code>EnumTriangleBasic --edges &lt;path&gt; --output &lt;path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link EnumTrianglesData}. 
- * 
- * <p>
- * This example shows how to use:
+ *
+ * <p>Usage: <code>EnumTriangleBasic --edges &lt;path&gt; --output &lt;path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link EnumTrianglesData}.
+ *
+ * <p>This example shows how to use:
  * <ul>
  * <li>Custom Java objects which extend Tuple
  * <li>Group Sorting
  * </ul>
- * 
  */
 @SuppressWarnings("serial")
 public class EnumTriangles {
@@ -77,7 +73,7 @@ public class EnumTriangles {
 	// *************************************************************************
 	//     PROGRAM
 	// *************************************************************************
-	
+
 	public static void main(String[] args) throws Exception {
 
 		// Checking input parameters
@@ -88,7 +84,7 @@ public class EnumTriangles {
 
 		// make parameters available in the web interface
 		env.getConfig().setGlobalJobParameters(params);
-	
+
 		// read input data
 		DataSet<Edge> edges;
 		if (params.has("edges")) {
@@ -106,7 +102,7 @@ public class EnumTriangles {
 		// project edges by vertex id
 		DataSet<Edge> edgesById = edges
 				.map(new EdgeByIdProjector());
-		
+
 		DataSet<Triad> triangles = edgesById
 				// build triads
 				.groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new TriadBuilder())
@@ -128,60 +124,60 @@ public class EnumTriangles {
 	//     USER FUNCTIONS
 	// *************************************************************************
 
-	/** Converts a Tuple2 into an Edge */
+	/** Converts a Tuple2 into an Edge. */
 	@ForwardedFields("0;1")
 	public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> {
 		private final Edge outEdge = new Edge();
-		
+
 		@Override
 		public Edge map(Tuple2<Integer, Integer> t) throws Exception {
 			outEdge.copyVerticesFromTuple2(t);
 			return outEdge;
 		}
 	}
-	
+
 	/** Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second. */
 	private static class EdgeByIdProjector implements MapFunction<Edge, Edge> {
-	
+
 		@Override
 		public Edge map(Edge inEdge) throws Exception {
-			
+
 			// flip vertices if necessary
-			if(inEdge.getFirstVertex() > inEdge.getSecondVertex()) {
+			if (inEdge.getFirstVertex() > inEdge.getSecondVertex()) {
 				inEdge.flipVertices();
 			}
-			
+
 			return inEdge;
 		}
 	}
-	
+
 	/**
 	 *  Builds triads (triples of vertices) from pairs of edges that share a vertex.
-	 *  The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId. 
+	 *  The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId.
 	 *  Assumes that input edges share the first vertex and are in ascending order of the second vertex.
 	 */
 	@ForwardedFields("0")
 	private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> {
 		private final List<Integer> vertices = new ArrayList<Integer>();
 		private final Triad outTriad = new Triad();
-		
+
 		@Override
 		public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception {
-			
+
 			final Iterator<Edge> edges = edgesIter.iterator();
-			
+
 			// clear vertex list
 			vertices.clear();
-			
+
 			// read first edge
 			Edge firstEdge = edges.next();
 			outTriad.setFirstVertex(firstEdge.getFirstVertex());
 			vertices.add(firstEdge.getSecondVertex());
-			
+
 			// build and emit triads
 			while (edges.hasNext()) {
 				Integer higherVertexId = edges.next().getSecondVertex();
-				
+
 				// combine vertex with all previously read vertices
 				for (Integer lowerVertexId : vertices) {
 					outTriad.setSecondVertex(lowerVertexId);
@@ -192,14 +188,14 @@ public class EnumTriangles {
 			}
 		}
 	}
-	
+
 	/** Filters triads (three vertices connected by two edges) without a closing third edge. */
 	private static class TriadFilter implements JoinFunction<Triad, Edge, Triad> {
-		
+
 		@Override
 		public Triad join(Triad triad, Edge edge) throws Exception {
 			return triad;
 		}
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/PageRank.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/PageRank.java
index 33305af..f22f2e6 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/PageRank.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/PageRank.java
@@ -38,32 +38,28 @@ import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
 
 /**
  * A basic implementation of the Page Rank algorithm using a bulk iteration.
- * 
- * <p>
- * This implementation requires a set of pages and a set of directed links as input and works as follows. <br> 
+ *
+ * <p>This implementation requires a set of pages and a set of directed links as input and works as follows. <br>
  * In each iteration, the rank of every page is evenly distributed to all pages it points to.
  * Each page collects the partial ranks of all pages that point to it, sums them up, and applies a dampening factor to the sum.
  * The result is the new rank of the page. A new iteration is started with the new ranks of all pages.
  * This implementation terminates after a fixed number of iterations.<br>
- * This is the Wikipedia entry for the <a href="http://en.wikipedia.org/wiki/Page_rank">Page Rank algorithm</a>. 
- * 
- * <p>
- * Input files are plain text files and must be formatted as follows:
+ * This is the Wikipedia entry for the <a href="http://en.wikipedia.org/wiki/Page_rank">Page Rank algorithm</a>.
+ *
+ * <p>Input files are plain text files and must be formatted as follows:
  * <ul>
- * <li>Pages represented as an (long) ID separated by new-line characters.<br> 
+ * <li>Pages represented as an (long) ID separated by new-line characters.<br>
  * For example <code>"1\n2\n12\n42\n63"</code> gives five pages with IDs 1, 2, 12, 42, and 63.
- * <li>Links are represented as pairs of page IDs which are separated by space 
+ * <li>Links are represented as pairs of page IDs which are separated by space
  * characters. Links are separated by new-line characters.<br>
  * For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (directed) links (1)-&gt;(2), (2)-&gt;(12), (1)-&gt;(12), and (42)-&gt;(63).<br>
  * For this simple implementation it is required that each page has at least one incoming and one outgoing link (a page can point to itself).
  * </ul>
- * 
- * <p>
- * Usage: <code>PageRankBasic --pages &lt;path&gt; --links &lt;path&gt; --output &lt;path&gt; --numPages &lt;n&gt; --iterations &lt;n&gt;</code><br>
+ *
+ * <p>Usage: <code>PageRankBasic --pages &lt;path&gt; --links &lt;path&gt; --output &lt;path&gt; --numPages &lt;n&gt; --iterations &lt;n&gt;</code><br>
  * If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.graph.util.PageRankData} and 10 iterations.
- * 
- * <p>
- * This example shows how to use:
+ *
+ * <p>This example shows how to use:
  * <ul>
  * <li>Bulk Iterations
  * <li>Default Join
@@ -72,42 +68,42 @@ import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
  */
 @SuppressWarnings("serial")
 public class PageRank {
-	
+
 	private static final double DAMPENING_FACTOR = 0.85;
 	private static final double EPSILON = 0.0001;
-	
+
 	// *************************************************************************
 	//     PROGRAM
 	// *************************************************************************
-	
+
 	public static void main(String[] args) throws Exception {
 
 		ParameterTool params = ParameterTool.fromArgs(args);
 
 		final int numPages = params.getInt("numPages", PageRankData.getNumberOfPages());
 		final int maxIterations = params.getInt("iterations", 10);
-		
+
 		// set up execution environment
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		// make the parameters available to the web ui
 		env.getConfig().setGlobalJobParameters(params);
-		
+
 		// get input data
 		DataSet<Long> pagesInput = getPagesDataSet(env, params);
 		DataSet<Tuple2<Long, Long>> linksInput = getLinksDataSet(env, params);
-		
+
 		// assign initial rank to pages
 		DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
 				map(new RankAssigner((1.0d / numPages)));
-		
+
 		// build adjacency list from link input
-		DataSet<Tuple2<Long, Long[]>> adjacencyListInput = 
+		DataSet<Tuple2<Long, Long[]>> adjacencyListInput =
 				linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList());
-		
+
 		// set iterative data set
 		IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);
-		
+
 		DataSet<Tuple2<Long, Double>> newRanks = iteration
 				// join pages with outgoing edges and distribute rank
 				.join(adjacencyListInput).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
@@ -115,9 +111,9 @@ public class PageRank {
 				.groupBy(0).aggregate(SUM, 1)
 				// apply dampening factor
 				.map(new Dampener(DAMPENING_FACTOR, numPages));
-		
+
 		DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
-				newRanks, 
+				newRanks,
 				newRanks.join(iteration).where(0).equalTo(0)
 				// termination condition
 				.filter(new EpsilonFilter()));
@@ -131,45 +127,43 @@ public class PageRank {
 			System.out.println("Printing result to stdout. Use --output to specify output path.");
 			finalPageRanks.print();
 		}
-
-		
 	}
 
 	// *************************************************************************
 	//     USER FUNCTIONS
 	// *************************************************************************
 
-	/** 
-	 * A map function that assigns an initial rank to all pages. 
+	/**
+	 * A map function that assigns an initial rank to all pages.
 	 */
 	public static final class RankAssigner implements MapFunction<Long, Tuple2<Long, Double>> {
 		Tuple2<Long, Double> outPageWithRank;
-		
+
 		public RankAssigner(double rank) {
 			this.outPageWithRank = new Tuple2<Long, Double>(-1L, rank);
 		}
-		
+
 		@Override
 		public Tuple2<Long, Double> map(Long page) {
 			outPageWithRank.f0 = page;
 			return outPageWithRank;
 		}
 	}
-	
+
 	/**
 	 * A reduce function that takes a sequence of edges and builds the adjacency list for the vertex where the edges
 	 * originate. Run as a pre-processing step.
 	 */
 	@ForwardedFields("0")
 	public static final class BuildOutgoingEdgeList implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
-		
+
 		private final ArrayList<Long> neighbors = new ArrayList<Long>();
-		
+
 		@Override
 		public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) {
 			neighbors.clear();
 			Long id = 0L;
-			
+
 			for (Tuple2<Long, Long> n : values) {
 				id = n.f0;
 				neighbors.add(n.f1);
@@ -177,7 +171,7 @@ public class PageRank {
 			out.collect(new Tuple2<Long, Long[]>(id, neighbors.toArray(new Long[neighbors.size()])));
 		}
 	}
-	
+
 	/**
 	 * Join function that distributes a fraction of a vertex's rank to all neighbors.
 	 */
@@ -194,16 +188,16 @@ public class PageRank {
 			}
 		}
 	}
-	
+
 	/**
-	 * The function that applies the page rank dampening formula
+	 * The function that applies the page rank dampening formula.
 	 */
 	@ForwardedFields("0")
-	public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
+	public static final class Dampener implements MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
 
 		private final double dampening;
 		private final double randomJump;
-		
+
 		public Dampener(double dampening, double numVertices) {
 			this.dampening = dampening;
 			this.randomJump = (1 - dampening) / numVertices;
@@ -215,7 +209,7 @@ public class PageRank {
 			return value;
 		}
 	}
-	
+
 	/**
 	 * Filter that filters vertices where the rank difference is below a threshold.
 	 */
@@ -226,11 +220,11 @@ public class PageRank {
 			return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
 		}
 	}
-	
+
 	// *************************************************************************
 	//     UTIL METHODS
 	// *************************************************************************
-	
+
 	private static DataSet<Long> getPagesDataSet(ExecutionEnvironment env, ParameterTool params) {
 		if (params.has("pages")) {
 			return env.readCsvFile(params.get("pages"))

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
index 50e86ec..2857a0c 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
@@ -32,6 +32,15 @@ import org.apache.flink.util.Collector;
 import java.util.HashSet;
 import java.util.Set;
 
+/**
+ * The transitive closure of a graph contains an edge for each pair of vertices
+ * which are endpoints of at least one path in the graph.
+ *
+ * <p>This algorithm is implemented using a delta iteration. The transitive
+ * closure solution set is grown in each step by joining the workset of newly
+ * discovered path endpoints with the original graph edges and discarding
+ * previously discovered path endpoints (already in the solution set).
+ */
 @SuppressWarnings("serial")
 public class TransitiveClosureNaive {
 
@@ -57,9 +66,9 @@ public class TransitiveClosureNaive {
 			edges = ConnectedComponentsData.getDefaultEdgeDataSet(env);
 		}
 
-		IterativeDataSet<Tuple2<Long,Long>> paths = edges.iterate(maxIterations);
+		IterativeDataSet<Tuple2<Long, Long>> paths = edges.iterate(maxIterations);
 
-		DataSet<Tuple2<Long,Long>> nextPaths = paths
+		DataSet<Tuple2<Long, Long>> nextPaths = paths
 				.join(edges)
 				.where(1)
 				.equalTo(0)
@@ -83,17 +92,17 @@ public class TransitiveClosureNaive {
 					}
 				}).withForwardedFields("0;1");
 
-		DataSet<Tuple2<Long,Long>> newPaths = paths
+		DataSet<Tuple2<Long, Long>> newPaths = paths
 				.coGroup(nextPaths)
 				.where(0).equalTo(0)
 				.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
-					Set<Tuple2<Long,Long>> prevSet = new HashSet<Tuple2<Long,Long>>();
+					Set<Tuple2<Long, Long>> prevSet = new HashSet<Tuple2<Long, Long>>();
 					@Override
 					public void coGroup(Iterable<Tuple2<Long, Long>> prevPaths, Iterable<Tuple2<Long, Long>> nextPaths, Collector<Tuple2<Long, Long>> out) throws Exception {
-						for (Tuple2<Long,Long> prev : prevPaths) {
+						for (Tuple2<Long, Long> prev : prevPaths) {
 							prevSet.add(prev);
 						}
-						for (Tuple2<Long,Long> next: nextPaths) {
+						for (Tuple2<Long, Long> next: nextPaths) {
 							if (!prevSet.contains(next)) {
 								out.collect(next);
 							}
@@ -103,7 +112,6 @@ public class TransitiveClosureNaive {
 
 		DataSet<Tuple2<Long, Long>> transitiveClosure = paths.closeWith(nextPaths, newPaths);
 
-
 		// emit result
 		if (params.has("output")) {
 			transitiveClosure.writeAsCsv(params.get("output"), "\n", " ");

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
index dd1f596..6fb39d8 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.examples.java.graph.util;
 
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.LinkedList;
+import java.util.List;
 
 /**
  * Provides the default data sets used for the Connected Components example program.
@@ -31,7 +31,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
  *
  */
 public class ConnectedComponentsData {
-	
+
 	public static final long[] VERTICES  = new long[] {
 			1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
 
@@ -42,7 +42,7 @@ public class ConnectedComponentsData {
 		}
 		return env.fromCollection(verticesList);
 	}
-	
+
 	public static final Object[][] EDGES = new Object[][] {
 		new Object[]{1L, 2L},
 		new Object[]{2L, 3L},
@@ -59,14 +59,14 @@ public class ConnectedComponentsData {
 		new Object[]{1L, 15L},
 		new Object[]{16L, 1L}
 	};
-	
+
 	public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
-		
+
 		List<Tuple2<Long, Long>> edgeList = new LinkedList<Tuple2<Long, Long>>();
 		for (Object[] edge : EDGES) {
 			edgeList.add(new Tuple2<Long, Long>((Long) edge[0], (Long) edge[1]));
 		}
 		return env.fromCollection(edgeList);
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
index a54b3da..cc3c3ac 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
@@ -16,16 +16,15 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.examples.java.graph.util;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Provides the default data sets used for the Triangle Enumeration example programs.
  * The default data sets are used, if no parameters are given to the program.
@@ -36,7 +35,7 @@ public class EnumTrianglesData {
 	public static final Object[][] EDGES = {
 		{1, 2},
 		{1, 3},
-		{1 ,4},
+		{1, 4},
 		{1, 5},
 		{2, 3},
 		{2, 5},
@@ -46,14 +45,14 @@ public class EnumTrianglesData {
 		{5, 6},
 		{7, 8}
 	};
-	
+
 	public static DataSet<EnumTrianglesDataTypes.Edge> getDefaultEdgeDataSet(ExecutionEnvironment env) {
-		
+
 		List<EnumTrianglesDataTypes.Edge> edges = new ArrayList<EnumTrianglesDataTypes.Edge>();
-		for(Object[] e : EDGES) {
-			edges.add(new Edge((Integer)e[0], (Integer)e[1]));
+		for (Object[] e : EDGES) {
+			edges.add(new Edge((Integer) e[0], (Integer) e[1]));
 		}
-		
+
 		return env.fromCollection(edges);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java
index 5c6e8b0..0aa0ab0 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java
@@ -22,62 +22,88 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 
+/**
+ * The data classes for EnumTriangles.
+ */
 public class EnumTrianglesDataTypes {
 
+	/**
+	 * A POJO storing two vertex IDs.
+	 */
 	public static class Edge extends Tuple2<Integer, Integer> {
 		private static final long serialVersionUID = 1L;
-		
+
 		public static final int V1 = 0;
 		public static final int V2 = 1;
-		
+
 		public Edge() {}
-		
+
 		public Edge(final Integer v1, final Integer v2) {
 			this.setFirstVertex(v1);
 			this.setSecondVertex(v2);
 		}
-		
-		public Integer getFirstVertex() { return this.getField(V1); }
-		
-		public Integer getSecondVertex() { return this.getField(V2); }
-		
-		public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); }
-		
-		public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); }
-		
+
+		public Integer getFirstVertex() {
+			return this.getField(V1);
+		}
+
+		public Integer getSecondVertex() {
+			return this.getField(V2);
+		}
+
+		public void setFirstVertex(final Integer vertex1) {
+			this.setField(vertex1, V1);
+		}
+
+		public void setSecondVertex(final Integer vertex2) {
+			this.setField(vertex2, V2);
+		}
+
 		public void copyVerticesFromTuple2(Tuple2<Integer, Integer> t) {
 			this.setFirstVertex(t.f0);
 			this.setSecondVertex(t.f1);
 		}
-		
+
 		public void copyVerticesFromEdgeWithDegrees(EdgeWithDegrees ewd) {
 			this.setFirstVertex(ewd.getFirstVertex());
 			this.setSecondVertex(ewd.getSecondVertex());
 		}
-		
+
 		public void flipVertices() {
 			Integer tmp = this.getFirstVertex();
 			this.setFirstVertex(this.getSecondVertex());
 			this.setSecondVertex(tmp);
 		}
 	}
-	
+
+	/**
+	 * A POJO storing three vertex IDs.
+	 */
 	public static class Triad extends Tuple3<Integer, Integer, Integer> {
 		private static final long serialVersionUID = 1L;
-		
+
 		public static final int V1 = 0;
 		public static final int V2 = 1;
 		public static final int V3 = 2;
-		
+
 		public Triad() {}
-		
-		public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); }
-		
-		public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); }
-		
-		public void setThirdVertex(final Integer vertex3) { this.setField(vertex3, V3); }
+
+		public void setFirstVertex(final Integer vertex1) {
+			this.setField(vertex1, V1);
+		}
+
+		public void setSecondVertex(final Integer vertex2) {
+			this.setField(vertex2, V2);
+		}
+
+		public void setThirdVertex(final Integer vertex3) {
+			this.setField(vertex3, V3);
+		}
 	}
-	
+
+	/**
+	 * A POJO storing two vertex IDs with degree.
+	 */
 	public static class EdgeWithDegrees extends Tuple4<Integer, Integer, Integer, Integer> {
 		private static final long serialVersionUID = 1L;
 
@@ -85,25 +111,41 @@ public class EnumTrianglesDataTypes {
 		public static final int V2 = 1;
 		public static final int D1 = 2;
 		public static final int D2 = 3;
-		
+
 		public EdgeWithDegrees() { }
-			
-		public Integer getFirstVertex() { return this.getField(V1); }
-		
-		public Integer getSecondVertex() { return this.getField(V2); }
-		
-		public Integer getFirstDegree() { return this.getField(D1); }
-		
-		public Integer getSecondDegree() { return this.getField(D2); }
-		
-		public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); }
-		
-		public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); }
-		
-		public void setFirstDegree(final Integer degree1) { this.setField(degree1, D1); }
-		
-		public void setSecondDegree(final Integer degree2) { this.setField(degree2, D2); }
-		
+
+		public Integer getFirstVertex() {
+			return this.getField(V1);
+		}
+
+		public Integer getSecondVertex() {
+			return this.getField(V2);
+		}
+
+		public Integer getFirstDegree() {
+			return this.getField(D1);
+		}
+
+		public Integer getSecondDegree() {
+			return this.getField(D2);
+		}
+
+		public void setFirstVertex(final Integer vertex1) {
+			this.setField(vertex1, V1);
+		}
+
+		public void setSecondVertex(final Integer vertex2) {
+			this.setField(vertex2, V2);
+		}
+
+		public void setFirstDegree(final Integer degree1) {
+			this.setField(degree1, D1);
+		}
+
+		public void setSecondDegree(final Integer degree2) {
+			this.setField(degree2, D2);
+		}
+
 		public void copyFrom(final EdgeWithDegrees edge) {
 			this.setFirstVertex(edge.getFirstVertex());
 			this.setSecondVertex(edge.getSecondVertex());
@@ -111,6 +153,4 @@ public class EnumTrianglesDataTypes {
 			this.setSecondDegree(edge.getSecondDegree());
 		}
 	}
-	
-	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java
index f2d9078..1c0bde7 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.examples.java.graph.util;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Provides the default data sets used for the PageRank example program.
  * The default data sets are used, if no parameters are given to the program.
@@ -63,24 +63,24 @@ public class PageRankData {
 		{14L, 12L},
 		{15L, 1L},
 	};
-	
+
 	private static int numPages = 15;
-	
+
 	public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
-		
+
 		List<Tuple2<Long, Long>> edges = new ArrayList<Tuple2<Long, Long>>();
-		for(Object[] e : EDGES) {
-			edges.add(new Tuple2<Long, Long>((Long)e[0], (Long)e[1]));
+		for (Object[] e : EDGES) {
+			edges.add(new Tuple2<Long, Long>((Long) e[0], (Long) e[1]));
 		}
 		return env.fromCollection(edges);
 	}
-	
+
 	public static DataSet<Long> getDefaultPagesDataSet(ExecutionEnvironment env) {
 		return env.generateSequence(1, 15);
 	}
-	
+
 	public static int getNumberOfPages() {
 		return numPages;
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
index 44b566b..79ac9ec 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
@@ -18,39 +18,41 @@
 
 package org.apache.flink.examples.java.misc;
 
-import java.util.List;
-
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 
-/** 
+import java.util.List;
+
+/**
  * This example shows how to use the collection based execution of Flink.
- * 
- * The collection based execution is a local mode that is not using the full Flink runtime.
+ *
+ * <p>The collection based execution is a local mode that is not using the full Flink runtime.
  * DataSet transformations are executed on Java collections.
- * 
- * See the "Local Execution" section in the documentation for more details: 
+ *
+ * <p>See the "Local Execution" section in the documentation for more details:
  * 	http://flink.apache.org/docs/latest/apis/local_execution.html
- * 
  */
 public class CollectionExecutionExample {
-	
+
 	/**
-	 * POJO class representing a user
+	 * POJO class representing a user.
 	 */
 	public static class User {
 		public int userIdentifier;
 		public String name;
+
 		public User() {}
+
 		public User(int userIdentifier, String name) {
 			this.userIdentifier = userIdentifier; this.name = name;
 		}
+
 		public String toString() {
-			return "User{userIdentifier="+userIdentifier+" name="+name+"}";
+			return "User{userIdentifier=" + userIdentifier + " name=" + name + "}";
 		}
 	}
-	
+
 	/**
 	 * POJO for an EMail.
 	 */
@@ -58,36 +60,40 @@ public class CollectionExecutionExample {
 		public int userId;
 		public String subject;
 		public String body;
+
 		public EMail() {}
+
 		public EMail(int userId, String subject, String body) {
 			this.userId = userId; this.subject = subject; this.body = body;
 		}
+
 		public String toString() {
-			return "eMail{userId="+userId+" subject="+subject+" body="+body+"}";
+			return "eMail{userId=" + userId + " subject=" + subject + " body=" + body + "}";
 		}
-		
+
 	}
+
 	public static void main(String[] args) throws Exception {
 		// initialize a new Collection-based execution environment
 		final ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
-		
+
 		// create objects for users and emails
 		User[] usersArray = { new User(1, "Peter"), new User(2, "John"), new User(3, "Bill") };
-		
+
 		EMail[] emailsArray = {new EMail(1, "Re: Meeting", "How about 1pm?"),
 							new EMail(1, "Re: Meeting", "Sorry, I'm not availble"),
 							new EMail(3, "Re: Re: Project proposal", "Give me a few more days to think about it.")};
-		
+
 		// convert objects into a DataSet
 		DataSet<User> users = env.fromElements(usersArray);
 		DataSet<EMail> emails = env.fromElements(emailsArray);
-		
+
 		// join the two DataSets
-		DataSet<Tuple2<User,EMail>> joined = users.join(emails).where("userIdentifier").equalTo("userId");
-		
+		DataSet<Tuple2<User, EMail>> joined = users.join(emails).where("userIdentifier").equalTo("userId");
+
 		// retrieve the resulting Tuple2 elements into a ArrayList.
-		List<Tuple2<User,EMail>> result = joined.collect();
-		
+		List<Tuple2<User, EMail>> result = joined.collect();
+
 		// Do some work with the resulting ArrayList (=Collection).
 		for (Tuple2<User, EMail> t : result) {
 			System.err.println("Result = " + t);

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
index fc85110..f33d095 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
@@ -23,39 +23,38 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
-/** 
+/**
  * Estimates the value of Pi using the Monte Carlo method.
- * The area of a circle is Pi * R^2, R being the radius of the circle 
+ * The area of a circle is Pi * R^2, R being the radius of the circle
  * The area of a square is 4 * R^2, where the length of the square's edge is 2*R.
- * 
- * Thus Pi = 4 * (area of circle / area of square).
- * 
- * The idea is to find a way to estimate the circle to square area ratio.
+ *
+ * <p>Thus Pi = 4 * (area of circle / area of square).
+ *
+ * <p>The idea is to find a way to estimate the circle to square area ratio.
  * The Monte Carlo method suggests collecting random points (within the square)
  * and then counting the number of points that fall within the circle
- * 
+ *
  * <pre>
  * {@code
  * x = Math.random()
  * y = Math.random()
- * 
+ *
  * x * x + y * y < 1
  * }
  * </pre>
  */
 @SuppressWarnings("serial")
 public class PiEstimation implements java.io.Serializable {
-	
-	
+
 	public static void main(String[] args) throws Exception {
 
 		final long numSamples = args.length > 0 ? Long.parseLong(args[0]) : 1000000;
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		// count how many of the samples would randomly fall into
 		// the unit circle
-		DataSet<Long> count = 
+		DataSet<Long> count =
 				env.generateSequence(1, numSamples)
 				.map(new Sampler())
 				.reduce(new SumReducer());
@@ -68,9 +67,9 @@ public class PiEstimation implements java.io.Serializable {
 	//*************************************************************************
 	//     USER FUNCTIONS
 	//*************************************************************************
-	
-	
-	/** 
+
+
+	/**
 	 * Sampler randomly emits points that fall within a square of edge x * y.
 	 * It calculates the distance to the center of a virtually centered circle of radius x = y = 1
 	 * If the distance is less than 1, then and only then does it returns a 1.
@@ -85,8 +84,8 @@ public class PiEstimation implements java.io.Serializable {
 		}
 	}
 
-	
-	/** 
+
+	/**
 	 * Simply sums up all long values.
 	 */
 	public static final class SumReducer implements ReduceFunction<Long>{

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
index 90ad67a..7e9d41d 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
@@ -18,45 +18,41 @@
 
 package org.apache.flink.examples.java.ml;
 
-import java.io.Serializable;
-import java.util.Collection;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.examples.java.ml.util.LinearRegressionData;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
+
+import java.io.Serializable;
+import java.util.Collection;
 
 /**
  * This example implements a basic Linear Regression  to solve the y = theta0 + theta1*x problem using batch gradient descent algorithm.
  *
- * <p>
- * Linear Regression with BGD(batch gradient descent) algorithm is an iterative clustering algorithm and works as follows:<br>
+ * <p>Linear Regression with BGD(batch gradient descent) algorithm is an iterative clustering algorithm and works as follows:<br>
  * Giving a data set and target set, the BGD try to find out the best parameters for the data set to fit the target set.
  * In each iteration, the algorithm computes the gradient of the cost function and use it to update all the parameters.
  * The algorithm terminates after a fixed number of iterations (as in this implementation)
  * With enough iteration, the algorithm can minimize the cost function and find the best parameters
  * This is the Wikipedia entry for the <a href = "http://en.wikipedia.org/wiki/Linear_regression">Linear regression</a> and <a href = "http://en.wikipedia.org/wiki/Gradient_descent">Gradient descent algorithm</a>.
- * 
- * <p>
- * This implementation works on one-dimensional data. And find the two-dimensional theta.<br>
+ *
+ * <p>This implementation works on one-dimensional data. And find the two-dimensional theta.<br>
  * It find the best Theta parameter to fit the target.
- * 
- * <p>
- * Input files are plain text files and must be formatted as follows:
+ *
+ * <p>Input files are plain text files and must be formatted as follows:
  * <ul>
  * <li>Data points are represented as two double values separated by a blank character. The first one represent the X(the training data) and the second represent the Y(target).
  * Data points are separated by newline characters.<br>
  * For example <code>"-0.02 -0.04\n5.3 10.6\n"</code> gives two data points (x=-0.02, y=-0.04) and (x=5.3, y=10.6).
  * </ul>
- * 
- * <p>
- * This example shows how to use:
+ *
+ * <p>This example shows how to use:
  * <ul>
  * <li> Bulk iterations
  * <li> Broadcast variables in bulk iterations
@@ -102,7 +98,7 @@ public class LinearRegression {
 		// set number of bulk iterations for SGD linear Regression
 		IterativeDataSet<Params> loop = parameters.iterate(iterations);
 
-		DataSet<Params> new_parameters = data
+		DataSet<Params> newParameters = data
 				// compute a single step using every sample
 				.map(new SubUpdate()).withBroadcastSet(loop, "parameters")
 				// sum up all the steps
@@ -111,10 +107,10 @@ public class LinearRegression {
 				.map(new Update());
 
 		// feed new parameters back into next iteration
-		DataSet<Params> result = loop.closeWith(new_parameters);
+		DataSet<Params> result = loop.closeWith(newParameters);
 
 		// emit result
-		if(params.has("output")) {
+		if (params.has("output")) {
 			result.writeAsText(params.get("output"));
 			// execute program
 			env.execute("Linear Regression example");
@@ -132,11 +128,11 @@ public class LinearRegression {
 	 * A simple data sample, x means the input, and y means the target.
 	 */
 	public static class Data implements Serializable{
-		public double x,y;
+		public double x, y;
 
-		public Data() {};
+		public Data() {}
 
-		public Data(double x ,double y){
+		public Data(double x, double y) {
 			this.x = x;
 			this.y = y;
 		}
@@ -153,11 +149,11 @@ public class LinearRegression {
 	 */
 	public static class Params implements Serializable{
 
-		private double theta0,theta1;
+		private double theta0, theta1;
 
-		public Params(){};
+		public Params() {}
 
-		public Params(double x0, double x1){
+		public Params(double x0, double x1) {
 			this.theta0 = x0;
 			this.theta1 = x1;
 		}
@@ -183,9 +179,9 @@ public class LinearRegression {
 			this.theta1 = theta1;
 		}
 
-		public Params div(Integer a){
-			this.theta0 = theta0 / a ;
-			this.theta1 = theta1 / a ;
+		public Params div(Integer a) {
+			this.theta0 = theta0 / a;
+			this.theta1 = theta1 / a;
 			return this;
 		}
 
@@ -198,9 +194,9 @@ public class LinearRegression {
 	/**
 	 * Compute a single BGD type update for every parameters.
 	 */
-	public static class SubUpdate extends RichMapFunction<Data,Tuple2<Params,Integer>> {
+	public static class SubUpdate extends RichMapFunction<Data, Tuple2<Params, Integer>> {
 
-		private Collection<Params> parameters; 
+		private Collection<Params> parameters;
 
 		private Params parameter;
 
@@ -215,18 +211,18 @@ public class LinearRegression {
 		@Override
 		public Tuple2<Params, Integer> map(Data in) throws Exception {
 
-			for(Params p : parameters){
-				this.parameter = p; 
+			for (Params p : parameters){
+				this.parameter = p;
 			}
 
-			double theta_0 = parameter.theta0 - 0.01*((parameter.theta0 + (parameter.theta1*in.x)) - in.y);
-			double theta_1 = parameter.theta1 - 0.01*(((parameter.theta0 + (parameter.theta1*in.x)) - in.y) * in.x);
+			double theta0 = parameter.theta0 - 0.01 * ((parameter.theta0 + (parameter.theta1 * in.x)) - in.y);
+			double theta1 = parameter.theta1 - 0.01 * (((parameter.theta0 + (parameter.theta1 * in.x)) - in.y) * in.x);
 
-			return new Tuple2<Params,Integer>(new Params(theta_0,theta_1),count);
+			return new Tuple2<Params, Integer>(new Params(theta0, theta1), count);
 		}
 	}
 
-	/**  
+	/**
 	 * Accumulator all the update.
 	 * */
 	public static class UpdateAccumulator implements ReduceFunction<Tuple2<Params, Integer>> {
@@ -234,10 +230,10 @@ public class LinearRegression {
 		@Override
 		public Tuple2<Params, Integer> reduce(Tuple2<Params, Integer> val1, Tuple2<Params, Integer> val2) {
 
-			double new_theta0 = val1.f0.theta0 + val2.f0.theta0;
-			double new_theta1 = val1.f0.theta1 + val2.f0.theta1;
-			Params result = new Params(new_theta0,new_theta1);
-			return new Tuple2<Params, Integer>( result, val1.f1 + val2.f1);
+			double newTheta0 = val1.f0.theta0 + val2.f0.theta0;
+			double newTheta1 = val1.f0.theta1 + val2.f0.theta1;
+			Params result = new Params(newTheta0, newTheta1);
+			return new Tuple2<Params, Integer>(result, val1.f1 + val2.f1);
 
 		}
 	}
@@ -245,7 +241,7 @@ public class LinearRegression {
 	/**
 	 * Compute the final update by average them.
 	 */
-	public static class Update implements MapFunction<Tuple2<Params, Integer>,Params> {
+	public static class Update implements MapFunction<Tuple2<Params, Integer>, Params> {
 
 		@Override
 		public Params map(Tuple2<Params, Integer> arg0) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
index 838e320..3d0ad03 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
@@ -20,8 +20,8 @@ package org.apache.flink.examples.java.ml.util;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.examples.java.ml.LinearRegression.Params;
 import org.apache.flink.examples.java.ml.LinearRegression.Data;
+import org.apache.flink.examples.java.ml.LinearRegression.Params;
 
 import java.util.LinkedList;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java
index a9f9e08..96ee56c 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java
@@ -43,13 +43,13 @@ public class LinearRegressionDataGenerator {
 
 	/**
 	 * Main method to generate data for the {@link org.apache.flink.examples.java.ml.LinearRegression} example program.
-	 * <p>
-	 * The generator creates to files:
+	 *
+	 * <p>The generator creates to files:
 	 * <ul>
 	 * <li><code>{tmp.dir}/data</code> for the data points
-	 * </ul> 
-	 * 
-	 * @param args 
+	 * </ul>
+	 *
+	 * @param args
 	 * <ol>
 	 * <li>Int: Number of data points
 	 * <li><b>Optional</b> Long: Random seed
@@ -72,15 +72,15 @@ public class LinearRegressionDataGenerator {
 		// write the points out
 		BufferedWriter pointsOut = null;
 		try {
-			pointsOut = new BufferedWriter(new FileWriter(new File(tmpDir+"/"+POINTS_FILE)));
+			pointsOut = new BufferedWriter(new FileWriter(new File(tmpDir + "/" + POINTS_FILE)));
 			StringBuilder buffer = new StringBuilder();
 
 			// DIMENSIONALITY + 1 means that the number of x(dimensionality) and target y
-			double[] point = new double[DIMENSIONALITY+1];
+			double[] point = new double[DIMENSIONALITY + 1];
 
 			for (int i = 1; i <= numDataPoints; i++) {
 				point[0] = random.nextGaussian();
-				point[1] = 2 * point[0] + 0.01*random.nextGaussian();
+				point[1] = 2 * point[0] + 0.01 * random.nextGaussian();
 				writePoint(point, buffer, pointsOut);
 			}
 
@@ -91,17 +91,16 @@ public class LinearRegressionDataGenerator {
 			}
 		}
 
-		System.out.println("Wrote "+numDataPoints+" data points to "+tmpDir+"/"+POINTS_FILE);
+		System.out.println("Wrote " + numDataPoints + " data points to " + tmpDir + "/" + POINTS_FILE);
 	}
 
-
 	private static void writePoint(double[] data, StringBuilder buffer, BufferedWriter out) throws IOException {
 		buffer.setLength(0);
 
 		// write coordinates
 		for (int j = 0; j < data.length; j++) {
 			buffer.append(FORMAT.format(data[j]));
-			if(j < data.length - 1) {
+			if (j < data.length - 1) {
 				buffer.append(DELIMITER);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
index 87b5bff..feec3ef 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
@@ -18,32 +18,32 @@
 
 package org.apache.flink.examples.java.relational;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
 /**
  * This program filters lines from a CSV file with empty fields. In doing so, it counts the number of empty fields per
  * column within a CSV file using a custom accumulator for vectors. In this context, empty fields are those, that at
  * most contain whitespace characters like space and tab.
- * <p>
- * The input file is a plain text CSV file with the semicolon as field separator and double quotes as field delimiters
+ *
+ * <p>The input file is a plain text CSV file with the semicolon as field separator and double quotes as field delimiters
  * and three columns. See {@link #getDataSet(ExecutionEnvironment, ParameterTool)} for configuration.
- * <p>
- * Usage: <code>EmptyFieldsCountAccumulator --input &lt;path&gt; --output &lt;path&gt;</code> <br>
- * <p>
- * This example shows how to use:
+ *
+ * <p>Usage: <code>EmptyFieldsCountAccumulator --input &lt;path&gt; --output &lt;path&gt;</code> <br>
+ *
+ * <p>This example shows how to use:
  * <ul>
  * <li>custom accumulators
  * <li>tuple data types
@@ -122,7 +122,7 @@ public class EmptyFieldsCountAccumulator {
 
 	/**
 	 * This function filters all incoming tuples that have one or more empty fields.
-	 * In doing so, it also counts the number of empty fields per attribute with an accumulator (registered under 
+	 * In doing so, it also counts the number of empty fields per attribute with an accumulator (registered under
 	 * {@link EmptyFieldsCountAccumulator#EMPTY_FIELD_ACCUMULATOR}).
 	 */
 	public static final class EmptyFieldFilter extends RichFilterFunction<StringTriple> {