You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/25 11:04:12 UTC

[3/7] flink git commit: [FLINK-6707] [examples] Activate strict checkstyle for flink-examples

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
index 14fbc34..c585e82 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
@@ -20,67 +20,60 @@ package org.apache.flink.examples.java.relational;
 
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.tuple.Tuple6;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.utils.ParameterTool;
 
 /**
  * This program implements a modified version of the TPC-H query 10.
  * The original query can be found at
  * <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 45).
- * 
- * <p>
- * This program implements the following SQL equivalent:
- * 
- * <p>
- * <pre>{@code
- * SELECT 
+ *
+ * <p>This program implements the following SQL equivalent:
+ *
+ * <p><pre>{@code
+ * SELECT
  *        c_custkey,
- *        c_name, 
+ *        c_name,
  *        c_address,
- *        n_name, 
+ *        n_name,
  *        c_acctbal
- *        SUM(l_extendedprice * (1 - l_discount)) AS revenue,  
- * FROM   
- *        customer, 
- *        orders, 
- *        lineitem, 
- *        nation 
- * WHERE 
- *        c_custkey = o_custkey 
- *        AND l_orderkey = o_orderkey 
- *        AND YEAR(o_orderdate) > '1990' 
- *        AND l_returnflag = 'R' 
- *        AND c_nationkey = n_nationkey 
- * GROUP BY 
- *        c_custkey, 
- *        c_name, 
- *        c_acctbal, 
- *        n_name, 
+ *        SUM(l_extendedprice * (1 - l_discount)) AS revenue,
+ * FROM
+ *        customer,
+ *        orders,
+ *        lineitem,
+ *        nation
+ * WHERE
+ *        c_custkey = o_custkey
+ *        AND l_orderkey = o_orderkey
+ *        AND YEAR(o_orderdate) > '1990'
+ *        AND l_returnflag = 'R'
+ *        AND c_nationkey = n_nationkey
+ * GROUP BY
+ *        c_custkey,
+ *        c_name,
+ *        c_acctbal,
+ *        n_name,
  *        c_address
  * }</pre>
- *        
- * <p>
- * Compared to the original TPC-H query this version does not print 
+ *
+ * <p>Compared to the original TPC-H query this version does not print
  * c_phone and c_comment, only filters by years greater than 1990 instead of
  * a period of 3 months, and does not sort the result by revenue.
- * 
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator 
+ *
+ * <p>Input files are plain text CSV files using the pipe character ('|') as field separator
  * as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
- * 
- * <p>
- * Usage: <code>TPCHQuery10 --customer &lt;path&gt; --orders &lt;path&gt; --lineitem&lt;path&gt; --nation &lt;path&gt; --output &lt;path&gt;</code><br>
- *  
- * <p>
- * This example shows how to use:
+ *
+ * <p>Usage: <code>TPCHQuery10 --customer &lt;path&gt; --orders &lt;path&gt; --lineitem&lt;path&gt; --nation &lt;path&gt; --output &lt;path&gt;</code><br>
+ *
+ * <p>This example shows how to use:
  * <ul>
  * <li> tuple data types
  * <li> inline-defined functions
@@ -90,11 +83,11 @@ import org.apache.flink.api.java.utils.ParameterTool;
  */
 @SuppressWarnings("serial")
 public class TPCHQuery10 {
-	
+
 	// *************************************************************************
 	//     PROGRAM
 	// *************************************************************************
-	
+
 	public static void main(String[] args) throws Exception {
 
 		final ParameterTool params = ParameterTool.fromArgs(args);
@@ -109,7 +102,7 @@ public class TPCHQuery10 {
 			return;
 		}
 
-		// get customer data set: (custkey, name, address, nationkey, acctbal) 
+		// get customer data set: (custkey, name, address, nationkey, acctbal)
 		DataSet<Tuple5<Integer, String, String, Integer, Double>> customers =
 			getCustomerDataSet(env, params.get("customer"));
 		// get orders data set: (orderkey, custkey, orderdate)
@@ -126,17 +119,17 @@ public class TPCHQuery10 {
 		DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
 				// filter by year
 				orders.filter(
-								new FilterFunction<Tuple3<Integer,Integer, String>>() {
+								new FilterFunction<Tuple3<Integer, Integer, String>>() {
 									@Override
 									public boolean filter(Tuple3<Integer, Integer, String> o) {
 										return Integer.parseInt(o.f2.substring(0, 4)) > 1990;
 									}
 								})
 				// project fields out that are no longer required
-				.project(0,1);
+				.project(0, 1);
 
 		// lineitems filtered by flag: (orderkey, revenue)
-		DataSet<Tuple2<Integer, Double>> lineitemsFilteredByFlag = 
+		DataSet<Tuple2<Integer, Double>> lineitemsFilteredByFlag =
 				// filter by flag
 				lineitems.filter(new FilterFunction<Tuple4<Integer, Double, Double, String>>() {
 										@Override
@@ -154,24 +147,24 @@ public class TPCHQuery10 {
 					});
 
 		// join orders with lineitems: (custkey, revenue)
-		DataSet<Tuple2<Integer, Double>> revenueByCustomer = 
+		DataSet<Tuple2<Integer, Double>> revenueByCustomer =
 				ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
 									.where(0).equalTo(0)
 									.projectFirst(1).projectSecond(1);
-		
+
 		revenueByCustomer = revenueByCustomer.groupBy(0).aggregate(Aggregations.SUM, 1);
 
 		// join customer with nation (custkey, name, address, nationname, acctbal)
 		DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
 						.joinWithTiny(nations)
 						.where(3).equalTo(0)
-						.projectFirst(0,1,2).projectSecond(1).projectFirst(4);
+						.projectFirst(0, 1, 2).projectSecond(1).projectFirst(4);
 
 		// join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
-		DataSet<Tuple6<Integer, String, String, String, Double, Double>> result = 
+		DataSet<Tuple6<Integer, String, String, String, Double, Double>> result =
 				customerWithNation.join(revenueByCustomer)
 				.where(0).equalTo(0)
-				.projectFirst(0,1,2,3,4).projectSecond(1);
+				.projectFirst(0, 1, 2, 3, 4).projectSecond(1);
 
 		// emit result
 		if (params.has("output")) {
@@ -182,20 +175,20 @@ public class TPCHQuery10 {
 			System.out.println("Printing result to stdout. Use --output to specify output path.");
 			result.print();
 		}
-		
+
 	}
-	
+
 	// *************************************************************************
 	//     UTIL METHODS
 	// *************************************************************************
-	
+
 	private static DataSet<Tuple5<Integer, String, String, Integer, Double>> getCustomerDataSet(ExecutionEnvironment env, String customerPath) {
 		return env.readCsvFile(customerPath)
 					.fieldDelimiter("|")
 					.includeFields("11110100")
 					.types(Integer.class, String.class, String.class, Integer.class, Double.class);
 	}
-	
+
 	private static DataSet<Tuple3<Integer, Integer, String>> getOrdersDataSet(ExecutionEnvironment env, String ordersPath) {
 		return env.readCsvFile(ordersPath)
 					.fieldDelimiter("|")
@@ -209,7 +202,7 @@ public class TPCHQuery10 {
 					.includeFields("1000011010000000")
 					.types(Integer.class, Double.class, Double.class, String.class);
 	}
-	
+
 	private static DataSet<Tuple2<Integer, String>> getNationsDataSet(ExecutionEnvironment env, String nationPath) {
 		return env.readCsvFile(nationPath)
 					.fieldDelimiter("|")

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
index c849764..f416f30 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.examples.java.relational;
 
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.java.DataSet;
@@ -32,50 +27,49 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.utils.ParameterTool;
 
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
 /**
  * This program implements a modified version of the TPC-H query 3. The
  * example demonstrates how to assign names to fields by extending the Tuple class.
  * The original query can be found at
  * <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 29).
  *
- * <p>
- * This program implements the following SQL equivalent:
+ * <p>This program implements the following SQL equivalent:
  *
- * <p>
- * <pre>{@code
- * SELECT 
- *      l_orderkey, 
+ * <p><pre>{@code
+ * SELECT
+ *      l_orderkey,
  *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
- *      o_orderdate, 
- *      o_shippriority 
- * FROM customer, 
- *      orders, 
- *      lineitem 
+ *      o_orderdate,
+ *      o_shippriority
+ * FROM customer,
+ *      orders,
+ *      lineitem
  * WHERE
- *      c_mktsegment = '[SEGMENT]' 
+ *      c_mktsegment = '[SEGMENT]'
  *      AND c_custkey = o_custkey
  *      AND l_orderkey = o_orderkey
  *      AND o_orderdate < date '[DATE]'
  *      AND l_shipdate > date '[DATE]'
  * GROUP BY
- *      l_orderkey, 
- *      o_orderdate, 
+ *      l_orderkey,
+ *      o_orderdate,
  *      o_shippriority;
  * }</pre>
  *
- * <p>
- * Compared to the original TPC-H query this version does not sort the result by revenue
+ * <p>Compared to the original TPC-H query this version does not sort the result by revenue
  * and orderdate.
  *
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator 
+ * <p>Input files are plain text CSV files using the pipe character ('|') as field separator
  * as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
  *
- *  <p>
- * Usage: <code>TPCHQuery3 --lineitem&lt;path&gt; --customer &lt;path&gt; --orders&lt;path&gt; --output &lt;path&gt;</code><br>
- *  
- * <p>
- * This example shows how to use:
+ * <p>Usage: <code>TPCHQuery3 --lineitem&lt;path&gt; --customer &lt;path&gt; --orders&lt;path&gt; --output &lt;path&gt;</code><br>
+ *
+ * <p>This example shows how to use:
  * <ul>
  * <li> custom data type derived from tuple data types
  * <li> inline-defined functions
@@ -88,9 +82,9 @@ public class TPCHQuery3 {
 	// *************************************************************************
 	//     PROGRAM
 	// *************************************************************************
-	
+
 	public static void main(String[] args) throws Exception {
-		
+
 		final ParameterTool params = ParameterTool.fromArgs(args);
 
 		if (!params.has("lineitem") && !params.has("customer") && !params.has("orders")) {
@@ -109,7 +103,7 @@ public class TPCHQuery3 {
 		DataSet<Lineitem> lineitems = getLineitemDataSet(env, params.get("lineitem"));
 		DataSet<Order> orders = getOrdersDataSet(env, params.get("customer"));
 		DataSet<Customer> customers = getCustomerDataSet(env, params.get("orders"));
-		
+
 		// Filter market segment "AUTOMOBILE"
 		customers = customers.filter(
 								new FilterFunction<Customer>() {
@@ -124,19 +118,19 @@ public class TPCHQuery3 {
 							new FilterFunction<Order>() {
 								private final DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
 								private final Date date = format.parse("1995-03-12");
-								
+
 								@Override
 								public boolean filter(Order o) throws ParseException {
 									return format.parse(o.getOrderdate()).before(date);
 								}
 							});
-		
+
 		// Filter all Lineitems with l_shipdate > 12.03.1995
 		lineitems = lineitems.filter(
 								new FilterFunction<Lineitem>() {
 									private final DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
 									private final Date date = format.parse("1995-03-12");
-									
+
 									@Override
 									public boolean filter(Lineitem l) throws ParseException {
 										return format.parse(l.getShipdate()).after(date);
@@ -144,7 +138,7 @@ public class TPCHQuery3 {
 								});
 
 		// Join customers with orders and package them into a ShippingPriorityItem
-		DataSet<ShippingPriorityItem> customerWithOrders = 
+		DataSet<ShippingPriorityItem> customerWithOrders =
 				customers.join(orders).where(0).equalTo(1)
 							.with(
 								new JoinFunction<Customer, Order, ShippingPriorityItem>() {
@@ -154,9 +148,9 @@ public class TPCHQuery3 {
 												o.getShippriority());
 									}
 								});
-		
+
 		// Join the last join result with Lineitems
-		DataSet<ShippingPriorityItem> result = 
+		DataSet<ShippingPriorityItem> result =
 				customerWithOrders.join(lineitems).where(0).equalTo(0)
 									.with(
 											new JoinFunction<ShippingPriorityItem, Lineitem, ShippingPriorityItem>() {
@@ -169,7 +163,7 @@ public class TPCHQuery3 {
 								// Group by l_orderkey, o_orderdate and o_shippriority and compute revenue sum
 								.groupBy(0, 2, 3)
 								.aggregate(Aggregations.SUM, 1);
-		
+
 		// emit result
 		if (params.has("output")) {
 			result.writeAsCsv(params.get("output"), "\n", "|");
@@ -185,68 +179,111 @@ public class TPCHQuery3 {
 	// *************************************************************************
 	//     DATA TYPES
 	// *************************************************************************
-	
-	public static class Lineitem extends Tuple4<Long, Double, Double, String> {
 
-		public Long getOrderkey() { return this.f0; }
-		public Double getDiscount() { return this.f2; }
-		public Double getExtendedprice() { return this.f1; }
-		public String getShipdate() { return this.f3; }
+	private static class Lineitem extends Tuple4<Long, Double, Double, String> {
+
+		public Long getOrderkey() {
+			return this.f0;
+		}
+
+		public Double getDiscount() {
+			return this.f2;
+		}
+
+		public Double getExtendedprice() {
+			return this.f1;
+		}
+
+		public String getShipdate() {
+			return this.f3;
+		}
 	}
 
-	public static class Customer extends Tuple2<Long, String> {
-		
-		public Long getCustKey() { return this.f0; }
-		public String getMktsegment() { return this.f1; }
+	private static class Customer extends Tuple2<Long, String> {
+
+		public Long getCustKey() {
+			return this.f0;
+		}
+
+		public String getMktsegment() {
+			return this.f1;
+		}
 	}
 
-	public static class Order extends Tuple4<Long, Long, String, Long> {
-		
-		public Long getOrderKey() { return this.f0; }
-		public Long getCustKey() { return this.f1; }
-		public String getOrderdate() { return this.f2; }
-		public Long getShippriority() { return this.f3; }
+	private static class Order extends Tuple4<Long, Long, String, Long> {
+
+		public Long getOrderKey() {
+			return this.f0;
+		}
+
+		public Long getCustKey() {
+			return this.f1;
+		}
+
+		public String getOrderdate() {
+			return this.f2;
+		}
+
+		public Long getShippriority() {
+			return this.f3;
+		}
 	}
 
-	public static class ShippingPriorityItem extends Tuple4<Long, Double, String, Long> {
+	private static class ShippingPriorityItem extends Tuple4<Long, Double, String, Long> {
+
+		public ShippingPriorityItem() {}
+
+		public ShippingPriorityItem(Long orderkey, Double revenue,
+				String orderdate, Long shippriority) {
+			this.f0 = orderkey;
+			this.f1 = revenue;
+			this.f2 = orderdate;
+			this.f3 = shippriority;
+		}
+
+		public Long getOrderkey() {
+			return this.f0;
+		}
 
-		public ShippingPriorityItem() { }
+		public void setOrderkey(Long orderkey) {
+			this.f0 = orderkey;
+		}
 
-		public ShippingPriorityItem(Long o_orderkey, Double revenue,
-				String o_orderdate, Long o_shippriority) {
-			this.f0 = o_orderkey;
+		public Double getRevenue() {
+			return this.f1;
+		}
+
+		public void setRevenue(Double revenue) {
 			this.f1 = revenue;
-			this.f2 = o_orderdate;
-			this.f3 = o_shippriority;
 		}
-		
-		public Long getOrderkey() { return this.f0; }
-		public void setOrderkey(Long orderkey) { this.f0 = orderkey; }
-		public Double getRevenue() { return this.f1; }
-		public void setRevenue(Double revenue) { this.f1 = revenue; }
-		
-		public String getOrderdate() { return this.f2; }
-		public Long getShippriority() { return this.f3; }
+
+		public String getOrderdate() {
+			return this.f2;
+		}
+
+		public Long getShippriority() {
+			return this.f3;
+		}
 	}
-	
+
 	// *************************************************************************
 	//     UTIL METHODS
 	// *************************************************************************
-	
+
 	private static DataSet<Lineitem> getLineitemDataSet(ExecutionEnvironment env, String lineitemPath) {
 		return env.readCsvFile(lineitemPath)
 					.fieldDelimiter("|")
 					.includeFields("1000011000100000")
 					.tupleType(Lineitem.class);
 	}
-	
+
 	private static DataSet<Customer> getCustomerDataSet(ExecutionEnvironment env, String customerPath) {
 		return env.readCsvFile(customerPath)
 					.fieldDelimiter("|")
 					.includeFields("10000010")
 					.tupleType(Customer.class);
 	}
-	
+
 	private static DataSet<Order> getOrdersDataSet(ExecutionEnvironment env, String ordersPath) {
 		return env.readCsvFile(ordersPath)
 					.fieldDelimiter("|")

http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
index 5c8fac5..579d1ac 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
@@ -31,29 +31,28 @@ import org.apache.flink.examples.java.relational.util.WebLogData;
 import org.apache.flink.util.Collector;
 
 /**
- * This program processes web logs and relational data. 
+ * This program processes web logs and relational data.
  * It implements the following relational query:
  *
  * <pre>{@code
- * SELECT 
- *       r.pageURL, 
- *       r.pageRank, 
+ * SELECT
+ *       r.pageURL,
+ *       r.pageRank,
  *       r.avgDuration
  * FROM documents d JOIN rankings r
  *                  ON d.url = r.url
- * WHERE CONTAINS(d.text, [keywords]) 
- *       AND r.rank > [rank] 
- *       AND NOT EXISTS 
+ * WHERE CONTAINS(d.text, [keywords])
+ *       AND r.rank > [rank]
+ *       AND NOT EXISTS
  *           (
  *              SELECT * FROM Visits v
- *              WHERE v.destUrl = d.url 
+ *              WHERE v.destUrl = d.url
  *                    AND v.visitDate < [date]
  *           );
  * }</pre>
  *
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator.
- * The tables referenced in the query can be generated using the {@link org.apache.flink.examples.java.relational.util.WebLogDataGenerator} and 
+ * <p>Input files are plain text CSV files using the pipe character ('|') as field separator.
+ * The tables referenced in the query can be generated using the {@link org.apache.flink.examples.java.relational.util.WebLogDataGenerator} and
  * have the following schemas
  * <pre>{@code
  * CREATE TABLE Documents (
@@ -76,29 +75,26 @@ import org.apache.flink.util.Collector;
  *                searchWord VARCHAR(32),
  *                duration INT );
  * }</pre>
- * 
- * <p>
- * Usage: <code>WebLogAnalysis --documents &lt;path&gt; --ranks &lt;path&gt; --visits &lt;path&gt; --result &lt;path&gt;</code><br>
+ *
+ * <p>Usage: <code>WebLogAnalysis --documents &lt;path&gt; --ranks &lt;path&gt; --visits &lt;path&gt; --result &lt;path&gt;</code><br>
  * If no parameters are provided, the program is run with default data from {@link WebLogData}.
- * 
- * <p>
- * This example shows how to use:
+ *
+ * <p>This example shows how to use:
  * <ul>
  * <li> tuple data types
  * <li> projection and join projection
  * <li> the CoGroup transformation for an anti-join
  * </ul>
- * 
  */
 @SuppressWarnings("serial")
 public class WebLogAnalysis {
-	
+
 	// *************************************************************************
 	//     PROGRAM
 	// *************************************************************************
-	
+
 	public static void main(String[] args) throws Exception {
-		
+
 		final ParameterTool params = ParameterTool.fromArgs(args);
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -109,7 +105,7 @@ public class WebLogAnalysis {
 		DataSet<Tuple2<String, String>> documents = getDocumentsDataSet(env, params);
 		DataSet<Tuple3<Integer, String, Integer>> ranks = getRanksDataSet(env, params);
 		DataSet<Tuple2<String, String>> visits = getVisitsDataSet(env, params);
-		
+
 		// Retain documents with keywords
 		DataSet<Tuple1<String>> filterDocs = documents
 				.filter(new FilterDocByKeyWords())
@@ -125,19 +121,19 @@ public class WebLogAnalysis {
 				.project(0);
 
 		// Join the filtered documents and ranks, i.e., get all URLs with min rank and keywords
-		DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks = 
+		DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks =
 				filterDocs.join(filterRanks)
 							.where(0).equalTo(1)
-							.projectSecond(0,1,2);
+							.projectSecond(0, 1, 2);
 
 		// Anti-join urls with visits, i.e., retain all URLs which have NOT been visited in a certain time
-		DataSet<Tuple3<Integer, String, Integer>> result = 
+		DataSet<Tuple3<Integer, String, Integer>> result =
 				joinDocsRanks.coGroup(filterVisits)
 								.where(1).equalTo(0)
 								.with(new AntiJoinVisits());
 
 		// emit result
-		if(params.has("output")) {
+		if (params.has("output")) {
 			result.writeAsCsv(params.get("output"), "\n", "|");
 			// execute program
 			env.execute("WebLogAnalysis Example");
@@ -150,7 +146,7 @@ public class WebLogAnalysis {
 	// *************************************************************************
 	//     USER FUNCTIONS
 	// *************************************************************************
-	
+
 	/**
 	 * MapFunction that filters for documents that contain a certain set of
 	 * keywords.
@@ -162,7 +158,7 @@ public class WebLogAnalysis {
 		/**
 		 * Filters for documents that contain all of the given keywords and projects the records on the URL field.
 		 *
-		 * Output Format:
+		 * <p>Output Format:
 		 * 0: URL
 		 * 1: DOCUMENT_TEXT
 		 */
@@ -191,7 +187,7 @@ public class WebLogAnalysis {
 		 * Filters for records of the rank relation where the rank is greater
 		 * than the given threshold.
 		 *
-		 * Output Format:
+		 * <p>Output Format:
 		 * 0: RANK
 		 * 1: URL
 		 * 2: AVG_DURATION
@@ -214,7 +210,7 @@ public class WebLogAnalysis {
 		 * Filters for records of the visits relation where the year of visit is equal to a
 		 * specified value. The URL of all visit records passing the filter is emitted.
 		 *
-		 * Output Format:
+		 * <p>Output Format:
 		 * 0: URL
 		 * 1: DATE
 		 */
@@ -222,7 +218,7 @@ public class WebLogAnalysis {
 		public boolean filter(Tuple2<String, String> value) throws Exception {
 			// Parse date string with the format YYYY-MM-DD and extract the year
 			String dateString = value.f1;
-			int year = Integer.parseInt(dateString.substring(0,4));
+			int year = Integer.parseInt(dateString.substring(0, 4));
 			return (year == YEARFILTER);
 		}
 	}
@@ -240,7 +236,7 @@ public class WebLogAnalysis {
 		 * If the visit iterator is empty, all pairs of the rank iterator are emitted.
 		 * Otherwise, no pair is emitted.
 		 *
-		 * Output Format:
+		 * <p>Output Format:
 		 * 0: RANK
 		 * 1: URL
 		 * 2: AVG_DURATION
@@ -260,10 +256,10 @@ public class WebLogAnalysis {
 	// *************************************************************************
 	//     UTIL METHODS
 	// *************************************************************************
-	
+
 	private static DataSet<Tuple2<String, String>> getDocumentsDataSet(ExecutionEnvironment env, ParameterTool params) {
 		// Create DataSet for documents relation (URL, Doc-Text)
-		if(params.has("documents")) {
+		if (params.has("documents")) {
 			return env.readCsvFile(params.get("documents"))
 						.fieldDelimiter("|")
 						.types(String.class, String.class);
@@ -273,10 +269,10 @@ public class WebLogAnalysis {
 			return WebLogData.getDocumentDataSet(env);
 		}
 	}
-	
+
 	private static DataSet<Tuple3<Integer, String, Integer>> getRanksDataSet(ExecutionEnvironment env, ParameterTool params) {
 		// Create DataSet for ranks relation (Rank, URL, Avg-Visit-Duration)
-		if(params.has("ranks")) {
+		if (params.has("ranks")) {
 			return env.readCsvFile(params.get("ranks"))
 						.fieldDelimiter("|")
 						.types(Integer.class, String.class, Integer.class);
@@ -289,7 +285,7 @@ public class WebLogAnalysis {
 
 	private static DataSet<Tuple2<String, String>> getVisitsDataSet(ExecutionEnvironment env, ParameterTool params) {
 		// Create DataSet for visits relation (URL, Date)
-		if(params.has("visits")) {
+		if (params.has("visits")) {
 			return env.readCsvFile(params.get("visits"))
 						.fieldDelimiter("|")
 						.includeFields("011000000")
@@ -300,5 +296,5 @@ public class WebLogAnalysis {
 			return WebLogData.getVisitDataSet(env);
 		}
 	}
-		
+
 }