You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/25 11:04:12 UTC
[3/7] flink git commit: [FLINK-6707] [examples] Activate strict
checkstyle for flink-examples
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
index 14fbc34..c585e82 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
@@ -20,67 +20,60 @@ package org.apache.flink.examples.java.relational;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple6;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
/**
* This program implements a modified version of the TPC-H query 10.
* The original query can be found at
* <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 45).
- *
- * <p>
- * This program implements the following SQL equivalent:
- *
- * <p>
- * <pre>{@code
- * SELECT
+ *
+ * <p>This program implements the following SQL equivalent:
+ *
+ * <p><pre>{@code
+ * SELECT
* c_custkey,
- * c_name,
+ * c_name,
* c_address,
- * n_name,
+ * n_name,
* c_acctbal
- * SUM(l_extendedprice * (1 - l_discount)) AS revenue,
- * FROM
- * customer,
- * orders,
- * lineitem,
- * nation
- * WHERE
- * c_custkey = o_custkey
- * AND l_orderkey = o_orderkey
- * AND YEAR(o_orderdate) > '1990'
- * AND l_returnflag = 'R'
- * AND c_nationkey = n_nationkey
- * GROUP BY
- * c_custkey,
- * c_name,
- * c_acctbal,
- * n_name,
+ * SUM(l_extendedprice * (1 - l_discount)) AS revenue,
+ * FROM
+ * customer,
+ * orders,
+ * lineitem,
+ * nation
+ * WHERE
+ * c_custkey = o_custkey
+ * AND l_orderkey = o_orderkey
+ * AND YEAR(o_orderdate) > '1990'
+ * AND l_returnflag = 'R'
+ * AND c_nationkey = n_nationkey
+ * GROUP BY
+ * c_custkey,
+ * c_name,
+ * c_acctbal,
+ * n_name,
* c_address
* }</pre>
- *
- * <p>
- * Compared to the original TPC-H query this version does not print
+ *
+ * <p>Compared to the original TPC-H query this version does not print
* c_phone and c_comment, only filters by years greater than 1990 instead of
* a period of 3 months, and does not sort the result by revenue.
- *
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator
+ *
+ * <p>Input files are plain text CSV files using the pipe character ('|') as field separator
* as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
- *
- * <p>
- * Usage: <code>TPCHQuery10 --customer <path> --orders <path> --lineitem<path> --nation <path> --output <path></code><br>
- *
- * <p>
- * This example shows how to use:
+ *
+ * <p>Usage: <code>TPCHQuery10 --customer <path> --orders <path> --lineitem<path> --nation <path> --output <path></code><br>
+ *
+ * <p>This example shows how to use:
* <ul>
* <li> tuple data types
* <li> inline-defined functions
@@ -90,11 +83,11 @@ import org.apache.flink.api.java.utils.ParameterTool;
*/
@SuppressWarnings("serial")
public class TPCHQuery10 {
-
+
// *************************************************************************
// PROGRAM
// *************************************************************************
-
+
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
@@ -109,7 +102,7 @@ public class TPCHQuery10 {
return;
}
- // get customer data set: (custkey, name, address, nationkey, acctbal)
+ // get customer data set: (custkey, name, address, nationkey, acctbal)
DataSet<Tuple5<Integer, String, String, Integer, Double>> customers =
getCustomerDataSet(env, params.get("customer"));
// get orders data set: (orderkey, custkey, orderdate)
@@ -126,17 +119,17 @@ public class TPCHQuery10 {
DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
// filter by year
orders.filter(
- new FilterFunction<Tuple3<Integer,Integer, String>>() {
+ new FilterFunction<Tuple3<Integer, Integer, String>>() {
@Override
public boolean filter(Tuple3<Integer, Integer, String> o) {
return Integer.parseInt(o.f2.substring(0, 4)) > 1990;
}
})
// project fields out that are no longer required
- .project(0,1);
+ .project(0, 1);
// lineitems filtered by flag: (orderkey, revenue)
- DataSet<Tuple2<Integer, Double>> lineitemsFilteredByFlag =
+ DataSet<Tuple2<Integer, Double>> lineitemsFilteredByFlag =
// filter by flag
lineitems.filter(new FilterFunction<Tuple4<Integer, Double, Double, String>>() {
@Override
@@ -154,24 +147,24 @@ public class TPCHQuery10 {
});
// join orders with lineitems: (custkey, revenue)
- DataSet<Tuple2<Integer, Double>> revenueByCustomer =
+ DataSet<Tuple2<Integer, Double>> revenueByCustomer =
ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
.where(0).equalTo(0)
.projectFirst(1).projectSecond(1);
-
+
revenueByCustomer = revenueByCustomer.groupBy(0).aggregate(Aggregations.SUM, 1);
// join customer with nation (custkey, name, address, nationname, acctbal)
DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
.joinWithTiny(nations)
.where(3).equalTo(0)
- .projectFirst(0,1,2).projectSecond(1).projectFirst(4);
+ .projectFirst(0, 1, 2).projectSecond(1).projectFirst(4);
// join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
- DataSet<Tuple6<Integer, String, String, String, Double, Double>> result =
+ DataSet<Tuple6<Integer, String, String, String, Double, Double>> result =
customerWithNation.join(revenueByCustomer)
.where(0).equalTo(0)
- .projectFirst(0,1,2,3,4).projectSecond(1);
+ .projectFirst(0, 1, 2, 3, 4).projectSecond(1);
// emit result
if (params.has("output")) {
@@ -182,20 +175,20 @@ public class TPCHQuery10 {
System.out.println("Printing result to stdout. Use --output to specify output path.");
result.print();
}
-
+
}
-
+
// *************************************************************************
// UTIL METHODS
// *************************************************************************
-
+
private static DataSet<Tuple5<Integer, String, String, Integer, Double>> getCustomerDataSet(ExecutionEnvironment env, String customerPath) {
return env.readCsvFile(customerPath)
.fieldDelimiter("|")
.includeFields("11110100")
.types(Integer.class, String.class, String.class, Integer.class, Double.class);
}
-
+
private static DataSet<Tuple3<Integer, Integer, String>> getOrdersDataSet(ExecutionEnvironment env, String ordersPath) {
return env.readCsvFile(ordersPath)
.fieldDelimiter("|")
@@ -209,7 +202,7 @@ public class TPCHQuery10 {
.includeFields("1000011010000000")
.types(Integer.class, Double.class, Double.class, String.class);
}
-
+
private static DataSet<Tuple2<Integer, String>> getNationsDataSet(ExecutionEnvironment env, String nationPath) {
return env.readCsvFile(nationPath)
.fieldDelimiter("|")
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
index c849764..f416f30 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
@@ -18,11 +18,6 @@
package org.apache.flink.examples.java.relational;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.DataSet;
@@ -32,50 +27,49 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
/**
* This program implements a modified version of the TPC-H query 3. The
* example demonstrates how to assign names to fields by extending the Tuple class.
* The original query can be found at
* <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 29).
*
- * <p>
- * This program implements the following SQL equivalent:
+ * <p>This program implements the following SQL equivalent:
*
- * <p>
- * <pre>{@code
- * SELECT
- * l_orderkey,
+ * <p><pre>{@code
+ * SELECT
+ * l_orderkey,
* SUM(l_extendedprice*(1-l_discount)) AS revenue,
- * o_orderdate,
- * o_shippriority
- * FROM customer,
- * orders,
- * lineitem
+ * o_orderdate,
+ * o_shippriority
+ * FROM customer,
+ * orders,
+ * lineitem
* WHERE
- * c_mktsegment = '[SEGMENT]'
+ * c_mktsegment = '[SEGMENT]'
* AND c_custkey = o_custkey
* AND l_orderkey = o_orderkey
* AND o_orderdate < date '[DATE]'
* AND l_shipdate > date '[DATE]'
* GROUP BY
- * l_orderkey,
- * o_orderdate,
+ * l_orderkey,
+ * o_orderdate,
* o_shippriority;
* }</pre>
*
- * <p>
- * Compared to the original TPC-H query this version does not sort the result by revenue
+ * <p>Compared to the original TPC-H query this version does not sort the result by revenue
* and orderdate.
*
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator
+ * <p>Input files are plain text CSV files using the pipe character ('|') as field separator
* as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
*
- * <p>
- * Usage: <code>TPCHQuery3 --lineitem<path> --customer <path> --orders<path> --output <path></code><br>
- *
- * <p>
- * This example shows how to use:
+ * <p>Usage: <code>TPCHQuery3 --lineitem<path> --customer <path> --orders<path> --output <path></code><br>
+ *
+ * <p>This example shows how to use:
* <ul>
* <li> custom data type derived from tuple data types
* <li> inline-defined functions
@@ -88,9 +82,9 @@ public class TPCHQuery3 {
// *************************************************************************
// PROGRAM
// *************************************************************************
-
+
public static void main(String[] args) throws Exception {
-
+
final ParameterTool params = ParameterTool.fromArgs(args);
if (!params.has("lineitem") && !params.has("customer") && !params.has("orders")) {
@@ -109,7 +103,7 @@ public class TPCHQuery3 {
DataSet<Lineitem> lineitems = getLineitemDataSet(env, params.get("lineitem"));
DataSet<Order> orders = getOrdersDataSet(env, params.get("customer"));
DataSet<Customer> customers = getCustomerDataSet(env, params.get("orders"));
-
+
// Filter market segment "AUTOMOBILE"
customers = customers.filter(
new FilterFunction<Customer>() {
@@ -124,19 +118,19 @@ public class TPCHQuery3 {
new FilterFunction<Order>() {
private final DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
private final Date date = format.parse("1995-03-12");
-
+
@Override
public boolean filter(Order o) throws ParseException {
return format.parse(o.getOrderdate()).before(date);
}
});
-
+
// Filter all Lineitems with l_shipdate > 12.03.1995
lineitems = lineitems.filter(
new FilterFunction<Lineitem>() {
private final DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
private final Date date = format.parse("1995-03-12");
-
+
@Override
public boolean filter(Lineitem l) throws ParseException {
return format.parse(l.getShipdate()).after(date);
@@ -144,7 +138,7 @@ public class TPCHQuery3 {
});
// Join customers with orders and package them into a ShippingPriorityItem
- DataSet<ShippingPriorityItem> customerWithOrders =
+ DataSet<ShippingPriorityItem> customerWithOrders =
customers.join(orders).where(0).equalTo(1)
.with(
new JoinFunction<Customer, Order, ShippingPriorityItem>() {
@@ -154,9 +148,9 @@ public class TPCHQuery3 {
o.getShippriority());
}
});
-
+
// Join the last join result with Lineitems
- DataSet<ShippingPriorityItem> result =
+ DataSet<ShippingPriorityItem> result =
customerWithOrders.join(lineitems).where(0).equalTo(0)
.with(
new JoinFunction<ShippingPriorityItem, Lineitem, ShippingPriorityItem>() {
@@ -169,7 +163,7 @@ public class TPCHQuery3 {
// Group by l_orderkey, o_orderdate and o_shippriority and compute revenue sum
.groupBy(0, 2, 3)
.aggregate(Aggregations.SUM, 1);
-
+
// emit result
if (params.has("output")) {
result.writeAsCsv(params.get("output"), "\n", "|");
@@ -185,68 +179,111 @@ public class TPCHQuery3 {
// *************************************************************************
// DATA TYPES
// *************************************************************************
-
- public static class Lineitem extends Tuple4<Long, Double, Double, String> {
- public Long getOrderkey() { return this.f0; }
- public Double getDiscount() { return this.f2; }
- public Double getExtendedprice() { return this.f1; }
- public String getShipdate() { return this.f3; }
+ private static class Lineitem extends Tuple4<Long, Double, Double, String> {
+
+ public Long getOrderkey() {
+ return this.f0;
+ }
+
+ public Double getDiscount() {
+ return this.f2;
+ }
+
+ public Double getExtendedprice() {
+ return this.f1;
+ }
+
+ public String getShipdate() {
+ return this.f3;
+ }
}
- public static class Customer extends Tuple2<Long, String> {
-
- public Long getCustKey() { return this.f0; }
- public String getMktsegment() { return this.f1; }
+ private static class Customer extends Tuple2<Long, String> {
+
+ public Long getCustKey() {
+ return this.f0;
+ }
+
+ public String getMktsegment() {
+ return this.f1;
+ }
}
- public static class Order extends Tuple4<Long, Long, String, Long> {
-
- public Long getOrderKey() { return this.f0; }
- public Long getCustKey() { return this.f1; }
- public String getOrderdate() { return this.f2; }
- public Long getShippriority() { return this.f3; }
+ private static class Order extends Tuple4<Long, Long, String, Long> {
+
+ public Long getOrderKey() {
+ return this.f0;
+ }
+
+ public Long getCustKey() {
+ return this.f1;
+ }
+
+ public String getOrderdate() {
+ return this.f2;
+ }
+
+ public Long getShippriority() {
+ return this.f3;
+ }
}
- public static class ShippingPriorityItem extends Tuple4<Long, Double, String, Long> {
+ private static class ShippingPriorityItem extends Tuple4<Long, Double, String, Long> {
+
+ public ShippingPriorityItem() {}
+
+ public ShippingPriorityItem(Long orderkey, Double revenue,
+ String orderdate, Long shippriority) {
+ this.f0 = orderkey;
+ this.f1 = revenue;
+ this.f2 = orderdate;
+ this.f3 = shippriority;
+ }
+
+ public Long getOrderkey() {
+ return this.f0;
+ }
- public ShippingPriorityItem() { }
+ public void setOrderkey(Long orderkey) {
+ this.f0 = orderkey;
+ }
- public ShippingPriorityItem(Long o_orderkey, Double revenue,
- String o_orderdate, Long o_shippriority) {
- this.f0 = o_orderkey;
+ public Double getRevenue() {
+ return this.f1;
+ }
+
+ public void setRevenue(Double revenue) {
this.f1 = revenue;
- this.f2 = o_orderdate;
- this.f3 = o_shippriority;
}
-
- public Long getOrderkey() { return this.f0; }
- public void setOrderkey(Long orderkey) { this.f0 = orderkey; }
- public Double getRevenue() { return this.f1; }
- public void setRevenue(Double revenue) { this.f1 = revenue; }
-
- public String getOrderdate() { return this.f2; }
- public Long getShippriority() { return this.f3; }
+
+ public String getOrderdate() {
+ return this.f2;
+ }
+
+ public Long getShippriority() {
+ return this.f3;
+ }
}
-
+
// *************************************************************************
// UTIL METHODS
// *************************************************************************
-
+
private static DataSet<Lineitem> getLineitemDataSet(ExecutionEnvironment env, String lineitemPath) {
return env.readCsvFile(lineitemPath)
.fieldDelimiter("|")
.includeFields("1000011000100000")
.tupleType(Lineitem.class);
}
-
+
private static DataSet<Customer> getCustomerDataSet(ExecutionEnvironment env, String customerPath) {
return env.readCsvFile(customerPath)
.fieldDelimiter("|")
.includeFields("10000010")
.tupleType(Customer.class);
}
-
+
private static DataSet<Order> getOrdersDataSet(ExecutionEnvironment env, String ordersPath) {
return env.readCsvFile(ordersPath)
.fieldDelimiter("|")
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
index 5c8fac5..579d1ac 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
@@ -31,29 +31,28 @@ import org.apache.flink.examples.java.relational.util.WebLogData;
import org.apache.flink.util.Collector;
/**
- * This program processes web logs and relational data.
+ * This program processes web logs and relational data.
* It implements the following relational query:
*
* <pre>{@code
- * SELECT
- * r.pageURL,
- * r.pageRank,
+ * SELECT
+ * r.pageURL,
+ * r.pageRank,
* r.avgDuration
* FROM documents d JOIN rankings r
* ON d.url = r.url
- * WHERE CONTAINS(d.text, [keywords])
- * AND r.rank > [rank]
- * AND NOT EXISTS
+ * WHERE CONTAINS(d.text, [keywords])
+ * AND r.rank > [rank]
+ * AND NOT EXISTS
* (
* SELECT * FROM Visits v
- * WHERE v.destUrl = d.url
+ * WHERE v.destUrl = d.url
* AND v.visitDate < [date]
* );
* }</pre>
*
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator.
- * The tables referenced in the query can be generated using the {@link org.apache.flink.examples.java.relational.util.WebLogDataGenerator} and
+ * <p>Input files are plain text CSV files using the pipe character ('|') as field separator.
+ * The tables referenced in the query can be generated using the {@link org.apache.flink.examples.java.relational.util.WebLogDataGenerator} and
* have the following schemas
* <pre>{@code
* CREATE TABLE Documents (
@@ -76,29 +75,26 @@ import org.apache.flink.util.Collector;
* searchWord VARCHAR(32),
* duration INT );
* }</pre>
- *
- * <p>
- * Usage: <code>WebLogAnalysis --documents <path> --ranks <path> --visits <path> --result <path></code><br>
+ *
+ * <p>Usage: <code>WebLogAnalysis --documents <path> --ranks <path> --visits <path> --result <path></code><br>
* If no parameters are provided, the program is run with default data from {@link WebLogData}.
- *
- * <p>
- * This example shows how to use:
+ *
+ * <p>This example shows how to use:
* <ul>
* <li> tuple data types
* <li> projection and join projection
* <li> the CoGroup transformation for an anti-join
* </ul>
- *
*/
@SuppressWarnings("serial")
public class WebLogAnalysis {
-
+
// *************************************************************************
// PROGRAM
// *************************************************************************
-
+
public static void main(String[] args) throws Exception {
-
+
final ParameterTool params = ParameterTool.fromArgs(args);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -109,7 +105,7 @@ public class WebLogAnalysis {
DataSet<Tuple2<String, String>> documents = getDocumentsDataSet(env, params);
DataSet<Tuple3<Integer, String, Integer>> ranks = getRanksDataSet(env, params);
DataSet<Tuple2<String, String>> visits = getVisitsDataSet(env, params);
-
+
// Retain documents with keywords
DataSet<Tuple1<String>> filterDocs = documents
.filter(new FilterDocByKeyWords())
@@ -125,19 +121,19 @@ public class WebLogAnalysis {
.project(0);
// Join the filtered documents and ranks, i.e., get all URLs with min rank and keywords
- DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks =
+ DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks =
filterDocs.join(filterRanks)
.where(0).equalTo(1)
- .projectSecond(0,1,2);
+ .projectSecond(0, 1, 2);
// Anti-join urls with visits, i.e., retain all URLs which have NOT been visited in a certain time
- DataSet<Tuple3<Integer, String, Integer>> result =
+ DataSet<Tuple3<Integer, String, Integer>> result =
joinDocsRanks.coGroup(filterVisits)
.where(1).equalTo(0)
.with(new AntiJoinVisits());
// emit result
- if(params.has("output")) {
+ if (params.has("output")) {
result.writeAsCsv(params.get("output"), "\n", "|");
// execute program
env.execute("WebLogAnalysis Example");
@@ -150,7 +146,7 @@ public class WebLogAnalysis {
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
-
+
/**
* MapFunction that filters for documents that contain a certain set of
* keywords.
@@ -162,7 +158,7 @@ public class WebLogAnalysis {
/**
* Filters for documents that contain all of the given keywords and projects the records on the URL field.
*
- * Output Format:
+ * <p>Output Format:
* 0: URL
* 1: DOCUMENT_TEXT
*/
@@ -191,7 +187,7 @@ public class WebLogAnalysis {
* Filters for records of the rank relation where the rank is greater
* than the given threshold.
*
- * Output Format:
+ * <p>Output Format:
* 0: RANK
* 1: URL
* 2: AVG_DURATION
@@ -214,7 +210,7 @@ public class WebLogAnalysis {
* Filters for records of the visits relation where the year of visit is equal to a
* specified value. The URL of all visit records passing the filter is emitted.
*
- * Output Format:
+ * <p>Output Format:
* 0: URL
* 1: DATE
*/
@@ -222,7 +218,7 @@ public class WebLogAnalysis {
public boolean filter(Tuple2<String, String> value) throws Exception {
// Parse date string with the format YYYY-MM-DD and extract the year
String dateString = value.f1;
- int year = Integer.parseInt(dateString.substring(0,4));
+ int year = Integer.parseInt(dateString.substring(0, 4));
return (year == YEARFILTER);
}
}
@@ -240,7 +236,7 @@ public class WebLogAnalysis {
* If the visit iterator is empty, all pairs of the rank iterator are emitted.
* Otherwise, no pair is emitted.
*
- * Output Format:
+ * <p>Output Format:
* 0: RANK
* 1: URL
* 2: AVG_DURATION
@@ -260,10 +256,10 @@ public class WebLogAnalysis {
// *************************************************************************
// UTIL METHODS
// *************************************************************************
-
+
private static DataSet<Tuple2<String, String>> getDocumentsDataSet(ExecutionEnvironment env, ParameterTool params) {
// Create DataSet for documents relation (URL, Doc-Text)
- if(params.has("documents")) {
+ if (params.has("documents")) {
return env.readCsvFile(params.get("documents"))
.fieldDelimiter("|")
.types(String.class, String.class);
@@ -273,10 +269,10 @@ public class WebLogAnalysis {
return WebLogData.getDocumentDataSet(env);
}
}
-
+
private static DataSet<Tuple3<Integer, String, Integer>> getRanksDataSet(ExecutionEnvironment env, ParameterTool params) {
// Create DataSet for ranks relation (Rank, URL, Avg-Visit-Duration)
- if(params.has("ranks")) {
+ if (params.has("ranks")) {
return env.readCsvFile(params.get("ranks"))
.fieldDelimiter("|")
.types(Integer.class, String.class, Integer.class);
@@ -289,7 +285,7 @@ public class WebLogAnalysis {
private static DataSet<Tuple2<String, String>> getVisitsDataSet(ExecutionEnvironment env, ParameterTool params) {
// Create DataSet for visits relation (URL, Date)
- if(params.has("visits")) {
+ if (params.has("visits")) {
return env.readCsvFile(params.get("visits"))
.fieldDelimiter("|")
.includeFields("011000000")
@@ -300,5 +296,5 @@ public class WebLogAnalysis {
return WebLogData.getVisitDataSet(env);
}
}
-
+
}