You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/09/29 22:58:39 UTC
git commit: Added TPCH query examples for Scala API Improved Java
TPCH query examples Removed RelationalQuery Java API example
Repository: incubator-flink
Updated Branches:
refs/heads/master 2b4d77929 -> ea4c8828c
Added TPCH query examples for Scala API
Improved Java TPCH query examples
Removed RelationalQuery Java API example
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ea4c8828
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ea4c8828
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ea4c8828
Branch: refs/heads/master
Commit: ea4c8828c0d0170ccfe493494f06e6eb86ffdaf7
Parents: 2b4d779
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Sep 29 17:53:39 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Sep 29 22:13:07 2014 +0200
----------------------------------------------------------------------
.../java/relational/RelationalQuery.java | 171 -----------------
.../examples/java/relational/TPCHQuery10.java | 55 +++---
.../examples/java/relational/TPCHQuery3.java | 150 +++++++--------
.../examples/scala/relational/TPCHQuery10.scala | 184 +++++++++++++++++++
.../examples/scala/relational/TPCHQuery3.scala | 172 +++++++++++++++++
5 files changed, 440 insertions(+), 292 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea4c8828/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/RelationalQuery.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/RelationalQuery.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/RelationalQuery.java
deleted file mode 100644
index e54bb99..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/RelationalQuery.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.relational;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * This program implements the following relational query on the TPC-H data set.
- *
- * <p>
- * <code><pre>
- * SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
- * FROM orders, lineitem
- * WHERE l_orderkey = o_orderkey
- * AND o_orderstatus = "X"
- * AND YEAR(o_orderdate) > Y
- * AND o_orderpriority LIKE "Z%"
- * GROUP BY l_orderkey, o_shippriority;
- * </pre></code>
- *
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator
- * as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
- *
- * <p>
- * Usage: <code>RelationalQuery <orders-csv path> <lineitem-csv path> <result path></code><br>
- *
- * <p>
- * This example shows how to use:
- * <ul>
- * <li> tuple data types
- * <li> inline-defined functions
- * <li> projection and join projection
- * <li> build-in aggregation functions
- * </ul>
- */
-@SuppressWarnings("serial")
-public class RelationalQuery {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- private static String STATUS_FILTER = "F";
- private static int YEAR_FILTER = 1993;
- private static String OPRIO_FILTER = "5";
-
- public static void main(String[] args) throws Exception {
-
- if(!parseParameters(args)) {
- return;
- }
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)
- DataSet<Tuple5<Integer, String, String, String, Integer>> orders = getOrdersDataSet(env);
-
- // get lineitem data set: (orderkey, extendedprice)
- DataSet<Tuple2<Integer, Double>> lineitems = getLineitemDataSet(env);
-
- // orders filtered by year: (orderkey, custkey)
- DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
- // filter orders
- orders.filter(
- new FilterFunction<Tuple5<Integer, String, String, String, Integer>>() {
- @Override
- public boolean filter(Tuple5<Integer, String, String, String, Integer> t) {
- // status filter
- if(!t.f1.equals(STATUS_FILTER)) {
- return false;
- // year filter
- } else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) {
- return false;
- // order priority filter
- } else if(!t.f3.startsWith(OPRIO_FILTER)) {
- return false;
- }
- return true;
- }
- })
- // project fields out that are no longer required
- .project(0,4).types(Integer.class, Integer.class);
-
- // join orders with lineitems: (orderkey, shippriority, extendedprice)
- DataSet<Tuple3<Integer, Integer, Double>> lineitemsOfOrders =
- ordersFilteredByYear.joinWithHuge(lineitems)
- .where(0).equalTo(0)
- .projectFirst(0,1).projectSecond(1)
- .types(Integer.class, Integer.class, Double.class);
-
- // extendedprice sums: (orderkey, shippriority, sum(extendedprice))
- DataSet<Tuple3<Integer, Integer, Double>> priceSums =
- // group by order and sum extendedprice
- lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2);
-
- // emit result
- priceSums.writeAsCsv(outputPath);
-
- // execute program
- env.execute("Relational Query Example");
-
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static String ordersPath;
- private static String lineitemPath;
- private static String outputPath;
-
- private static boolean parseParameters(String[] programArguments) {
-
- if(programArguments.length > 0) {
- if(programArguments.length == 3) {
- ordersPath = programArguments[0];
- lineitemPath = programArguments[1];
- outputPath = programArguments[2];
- } else {
- System.err.println("Usage: RelationalQuery <orders-csv path> <lineitem-csv path> <result path>");
- return false;
- }
- } else {
- System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
- " Due to legal restrictions, we can not ship generated data.\n" +
- " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
- " Usage: RelationalQuery <orders-csv path> <lineitem-csv path> <result path>");
- return false;
- }
- return true;
- }
-
- private static DataSet<Tuple5<Integer, String, String, String, Integer>> getOrdersDataSet(ExecutionEnvironment env) {
- return env.readCsvFile(ordersPath)
- .fieldDelimiter('|')
- .includeFields("101011010")
- .types(Integer.class, String.class, String.class, String.class, Integer.class);
- }
-
- private static DataSet<Tuple2<Integer, Double>> getLineitemDataSet(ExecutionEnvironment env) {
- return env.readCsvFile(lineitemPath)
- .fieldDelimiter('|')
- .includeFields("1000010000000000")
- .types(Integer.class, Double.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea4c8828/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
index ff10b74..d3ba818 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
@@ -104,13 +104,10 @@ public class TPCHQuery10 {
// get customer data set: (custkey, name, address, nationkey, acctbal)
DataSet<Tuple5<Integer, String, String, Integer, Double>> customers = getCustomerDataSet(env);
-
// get orders data set: (orderkey, custkey, orderdate)
DataSet<Tuple3<Integer, Integer, String>> orders = getOrdersDataSet(env);
-
// get lineitem data set: (orderkey, extendedprice, discount, returnflag)
DataSet<Tuple4<Integer, Double, Double, String>> lineitems = getLineitemDataSet(env);
-
// get nation data set: (nationkey, name)
DataSet<Tuple2<Integer, String>> nations = getNationsDataSet(env);
@@ -120,46 +117,38 @@ public class TPCHQuery10 {
orders.filter(
new FilterFunction<Tuple3<Integer,Integer, String>>() {
@Override
- public boolean filter(Tuple3<Integer, Integer, String> t) {
- int year = Integer.parseInt(t.f2.substring(0, 4));
- return year > 1990;
+ public boolean filter(Tuple3<Integer, Integer, String> o) {
+ return Integer.parseInt(o.f2.substring(0, 4)) > 1990;
}
})
// project fields out that are no longer required
.project(0,1).types(Integer.class, Integer.class);
- // lineitems filtered by flag: (orderkey, extendedprice, discount)
- DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag =
+ // lineitems filtered by flag: (orderkey, revenue)
+ DataSet<Tuple2<Integer, Double>> lineitemsFilteredByFlag =
// filter by flag
lineitems.filter(new FilterFunction<Tuple4<Integer, Double, Double, String>>() {
@Override
- public boolean filter(Tuple4<Integer, Double, Double, String> t)
- throws Exception {
- return t.f3.equals("R");
+ public boolean filter(Tuple4<Integer, Double, Double, String> l) {
+ return l.f3.equals("R");
}
})
- // project fields out that are no longer required
- .project(0,1,2).types(Integer.class, Double.class, Double.class);
-
- // join orders with lineitems: (custkey, extendedprice, discount)
- DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey =
- ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
- .where(0).equalTo(0)
- .projectFirst(1).projectSecond(1,2)
- .types(Integer.class, Double.class, Double.class);
-
- // aggregate for revenue: (custkey, revenue)
- DataSet<Tuple2<Integer, Double>> revenueOfCustomerKey = lineitemsOfCustomerKey
- // calculate the revenue for each item
- .map(new MapFunction<Tuple3<Integer, Double, Double>, Tuple2<Integer, Double>>() {
+ // compute revenue and project out return flag
+ .map(new MapFunction<Tuple4<Integer, Double, Double, String>, Tuple2<Integer, Double>>() {
@Override
- public Tuple2<Integer, Double> map(Tuple3<Integer, Double, Double> t) {
+ public Tuple2<Integer, Double> map(Tuple4<Integer, Double, Double, String> l) {
// revenue per item = l_extendedprice * (1 - l_discount)
- return new Tuple2<Integer, Double>(t.f0, t.f1 * (1 - t.f2));
+ return new Tuple2<Integer, Double>(l.f0, l.f1 * (1 - l.f2));
}
- })
- // aggregate the revenues per item to revenue per customer
- .groupBy(0).aggregate(Aggregations.SUM, 1);
+ });
+
+ // join orders with lineitems: (custkey, revenue)
+ DataSet<Tuple2<Integer, Double>> revenueByCustomer =
+ ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
+ .where(0).equalTo(0)
+ .projectFirst(1).projectSecond(1)
+ .types(Integer.class, Double.class)
+ .groupBy(0).aggregate(Aggregations.SUM, 1);
// join customer with nation (custkey, name, address, nationname, acctbal)
DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
@@ -169,14 +158,14 @@ public class TPCHQuery10 {
.types(Integer.class, String.class, String.class, String.class, Double.class);
// join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
- DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue =
- customerWithNation.join(revenueOfCustomerKey)
+ DataSet<Tuple6<Integer, String, String, String, Double, Double>> result =
+ customerWithNation.join(revenueByCustomer)
.where(0).equalTo(0)
.projectFirst(0,1,2,3,4).projectSecond(1)
.types(Integer.class, String.class, String.class, String.class, Double.class, Double.class);
// emit result
- customerWithRevenue.writeAsCsv(outputPath);
+ result.writeAsCsv(outputPath, "\n", "|");
// execute program
env.execute("TPCH Query 10 Example");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea4c8828/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
index 9402008..c10147c 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
@@ -22,19 +22,15 @@ package org.apache.flink.examples.java.relational;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.Calendar;
import java.util.Date;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
/**
* This program implements a modified version of the TPC-H query 3. The
@@ -102,93 +98,72 @@ public class TPCHQuery3 {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
- DataSet<Lineitem> li = getLineitemDataSet(env);
- DataSet<Order> or = getOrdersDataSet(env);
- DataSet<Customer> cust = getCustomerDataSet(env);
+ DataSet<Lineitem> lineitems = getLineitemDataSet(env);
+ DataSet<Order> orders = getOrdersDataSet(env);
+ DataSet<Customer> customers = getCustomerDataSet(env);
// Filter market segment "AUTOMOBILE"
- cust = cust.filter(
- new FilterFunction<Customer>() {
+ customers = customers.filter(
+ new FilterFunction<Customer>() {
+ @Override
+ public boolean filter(Customer c) {
+ return c.getMktsegment().equals("AUTOMOBILE");
+ }
+ });
+
+ // Filter all Orders with o_orderdate < 12.03.1995
+ orders = orders.filter(
+ new FilterFunction<Order>() {
+ private final DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+ private final Date date = format.parse("1995-03-12");
+
@Override
- public boolean filter(Customer value) {
- return value.getMktsegment().equals("AUTOMOBILE");
+ public boolean filter(Order o) throws ParseException {
+ return format.parse(o.getOrderdate()).before(date);
}
});
-
- // Filter all Orders with o_orderdate < 12.03.1995
- or = or.filter(
- new FilterFunction<Order>() {
- private DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
- private Date date;
-
- {
- Calendar cal = Calendar.getInstance();
- cal.set(1995, 3, 12);
- date = cal.getTime();
- }
-
- @Override
- public boolean filter(Order value) throws ParseException {
- Date orderDate = format.parse(value.getOrderdate());
- return orderDate.before(date);
- }
- });
// Filter all Lineitems with l_shipdate > 12.03.1995
- li = li.filter(
- new FilterFunction<Lineitem>() {
- private DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
- private Date date;
-
- {
- Calendar cal = Calendar.getInstance();
- cal.set(1995, 3, 12);
- date = cal.getTime();
- }
-
- @Override
- public boolean filter(Lineitem value) throws ParseException {
- Date shipDate = format.parse(value.getShipdate());
- return shipDate.after(date);
- }
- });
+ lineitems = lineitems.filter(
+ new FilterFunction<Lineitem>() {
+ private final DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+ private final Date date = format.parse("1995-03-12");
+
+ @Override
+ public boolean filter(Lineitem l) throws ParseException {
+ return format.parse(l.getShipdate()).after(date);
+ }
+ });
// Join customers with orders and package them into a ShippingPriorityItem
DataSet<ShippingPriorityItem> customerWithOrders =
- cust.join(or)
- .where(0)
- .equalTo(0)
- .with(
- new JoinFunction<Customer, Order, ShippingPriorityItem>() {
- @Override
- public ShippingPriorityItem join(Customer first, Order second) {
- return new ShippingPriorityItem(0, 0.0, second.getOrderdate(),
- second.getShippriority(), second.getOrderkey());
- }
- });
+ customers.join(orders).where(0).equalTo(1)
+ .with(
+ new JoinFunction<Customer, Order, ShippingPriorityItem>() {
+ @Override
+ public ShippingPriorityItem join(Customer c, Order o) {
+ return new ShippingPriorityItem(o.getOrderKey(), 0.0, o.getOrderdate(),
+ o.getShippriority());
+ }
+ });
// Join the last join result with Lineitems
- DataSet<ShippingPriorityItem> joined =
- customerWithOrders.join(li)
- .where(4)
- .equalTo(0)
+ DataSet<ShippingPriorityItem> result =
+ customerWithOrders.join(lineitems).where(0).equalTo(0)
.with(
new JoinFunction<ShippingPriorityItem, Lineitem, ShippingPriorityItem>() {
@Override
- public ShippingPriorityItem join(ShippingPriorityItem first, Lineitem second) {
- first.setL_Orderkey(second.getOrderkey());
- first.setRevenue(second.getExtendedprice() * (1 - second.getDiscount()));
- return first;
+ public ShippingPriorityItem join(ShippingPriorityItem i, Lineitem l) {
+ i.setRevenue(l.getExtendedprice() * (1 - l.getDiscount()));
+ return i;
}
- });
-
- // Group by l_orderkey, o_orderdate and o_shippriority and compute revenue sum
- joined = joined
- .groupBy(0, 2, 3)
- .aggregate(Aggregations.SUM, 1);
+ })
+ // Group by l_orderkey, o_orderdate and o_shippriority and compute revenue sum
+ .groupBy(0, 2, 3)
+ .aggregate(Aggregations.SUM, 1);
// emit result
- joined.writeAsCsv(outputPath, "\n", "|");
+ result.writeAsCsv(outputPath, "\n", "|");
// execute program
env.execute("TPCH Query 3 Example");
@@ -213,34 +188,33 @@ public class TPCHQuery3 {
public String getMktsegment() { return this.f1; }
}
- public static class Order extends Tuple3<Integer, String, Integer> {
+ public static class Order extends Tuple4<Integer, Integer, String, Integer> {
- public Integer getOrderkey() { return this.f0; }
- public String getOrderdate() { return this.f1; }
- public Integer getShippriority() { return this.f2; }
+ public Integer getOrderKey() { return this.f0; }
+ public Integer getCustKey() { return this.f1; }
+ public String getOrderdate() { return this.f2; }
+ public Integer getShippriority() { return this.f3; }
}
- public static class ShippingPriorityItem extends Tuple5<Integer, Double, String, Integer, Integer> {
+ public static class ShippingPriorityItem extends Tuple4<Integer, Double, String, Integer> {
public ShippingPriorityItem() { }
- public ShippingPriorityItem(Integer l_orderkey, Double revenue,
- String o_orderdate, Integer o_shippriority, Integer o_orderkey) {
- this.f0 = l_orderkey;
+ public ShippingPriorityItem(Integer o_orderkey, Double revenue,
+ String o_orderdate, Integer o_shippriority) {
+ this.f0 = o_orderkey;
this.f1 = revenue;
this.f2 = o_orderdate;
this.f3 = o_shippriority;
- this.f4 = o_orderkey;
}
- public Integer getL_Orderkey() { return this.f0; }
- public void setL_Orderkey(Integer l_orderkey) { this.f0 = l_orderkey; }
+ public Integer getOrderkey() { return this.f0; }
+ public void setOrderkey(Integer orderkey) { this.f0 = orderkey; }
public Double getRevenue() { return this.f1; }
public void setRevenue(Double revenue) { this.f1 = revenue; }
public String getOrderdate() { return this.f2; }
public Integer getShippriority() { return this.f3; }
- public Integer getO_Orderkey() { return this.f4; }
}
// *************************************************************************
@@ -291,7 +265,7 @@ public class TPCHQuery3 {
private static DataSet<Order> getOrdersDataSet(ExecutionEnvironment env) {
return env.readCsvFile(ordersPath)
.fieldDelimiter('|')
- .includeFields("100010010")
+ .includeFields("110010010")
.tupleType(Order.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea4c8828/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
new file mode 100644
index 0000000..d83c2fb
--- /dev/null
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala.relational
+
+import org.apache.flink.api.scala._
+import org.apache.flink.util.Collector
+
+import org.apache.flink.api.java.aggregation.Aggregations
+
+/**
+ * This program implements a modified version of the TPC-H query 10.
+ *
+ * The original query can be found at
+ * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
+ * (page 45).
+ *
+ * This program implements the following SQL equivalent:
+ *
+ * {{{
+ * SELECT
+ * c_custkey,
+ * c_name,
+ * c_address,
+ * n_name,
+ * c_acctbal
+ * SUM(l_extendedprice * (1 - l_discount)) AS revenue,
+ * FROM
+ * customer,
+ * orders,
+ * lineitem,
+ * nation
+ * WHERE
+ * c_custkey = o_custkey
+ * AND l_orderkey = o_orderkey
+ * AND YEAR(o_orderdate) > '1990'
+ * AND l_returnflag = 'R'
+ * AND c_nationkey = n_nationkey
+ * GROUP BY
+ * c_custkey,
+ * c_name,
+ * c_acctbal,
+ * n_name,
+ * c_address
+ * }}}
+ *
+ * Compared to the original TPC-H query this version does not print
+ * c_phone and c_comment, only filters by years greater than 1990 instead of
+ * a period of 3 months, and does not sort the result by revenue..
+ *
+ * Input files are plain text CSV files using the pipe character ('|') as field separator
+ * as generated by the TPC-H data generator which is available at
+ * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
+ *
+ * Usage:
+ * {{{
+ *TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation path> <result path>
+ * }}}
+ *
+ * This example shows how to use:
+ * - tuple data types
+ * - build-in aggregation functions
+ * - join with size hints
+ *
+ */
+object TPCHQuery10 {
+
+ def main(args: Array[String]) {
+ if (!parseParameters(args)) {
+ return
+ }
+
+ // get execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ // get customer data set: (custkey, name, address, nationkey, acctbal)
+ val customers = getCustomerDataSet(env)
+ // get orders data set: (orderkey, custkey, orderdate)
+ val orders = getOrdersDataSet(env)
+ // get lineitem data set: (orderkey, extendedprice, discount, returnflag)
+ val lineitems = getLineitemDataSet(env)
+ // get nation data set: (nationkey, name)
+ val nations = getNationDataSet(env)
+
+ // filter orders by years
+ val orders1990 = orders.filter( o => o._3.substring(0,4).toInt > 1990)
+ .map( o => (o._1, o._2))
+
+ // filter lineitems by return status
+ val lineitemsReturn = lineitems.filter( l => l._4.equals("R"))
+ .map( l => (l._1, l._2 * (1 - l._3)) )
+
+ // compute revenue by customer
+ val revenueByCustomer = orders1990.joinWithHuge(lineitemsReturn).where(0).equalTo(0)
+ .apply( (o,l) => (o._2, l._2) )
+ .groupBy(0)
+ .aggregate(Aggregations.SUM, 1)
+
+ // compute final result by joining customer and nation information with revenue
+ val result = customers.joinWithTiny(nations).where(3).equalTo(0)
+ .apply( (c, n) => (c._1, c._2, c._3, n._2, c._5) )
+ .join(revenueByCustomer).where(0).equalTo(0)
+ .apply( (c, r) => (c._1, c._2, c._3, c._4, c._5, r._2) )
+ // emit result
+ result.writeAsCsv(outputPath, "\n", "|")
+
+ // execute program
+ env.execute("Scala TPCH Query 10 Example")
+ }
+
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private var customerPath: String = null
+ private var ordersPath: String = null
+ private var lineitemPath: String = null
+ private var nationPath: String = null
+ private var outputPath: String = null
+
+ private def parseParameters(args: Array[String]): Boolean = {
+ if (args.length == 5) {
+ customerPath = args(0)
+ ordersPath = args(1)
+ lineitemPath = args(2)
+ nationPath = args(3)
+ outputPath = args(4)
+ true
+ } else {
+ System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
+ " Due to legal restrictions, we can not ship generated data.\n" +
+ " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
+ " Usage: TPCHQuery10 <customer-csv path> <orders-csv path> " +
+ "<lineitem-csv path> <nation-csv path> <result path>");
+ false
+ }
+ }
+
+ private def getCustomerDataSet(env: ExecutionEnvironment):
+ DataSet[Tuple5[Int, String, String, Int, Double]] = {
+ env.readCsvFile[Tuple5[Int, String, String, Int, Double]](
+ customerPath,
+ fieldDelimiter = '|',
+ includedFields = Array(0,1,2,3,5) )
+ }
+
+ private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Tuple3[Int, Int, String]] = {
+ env.readCsvFile[Tuple3[Int, Int, String]](
+ ordersPath,
+ fieldDelimiter = '|',
+ includedFields = Array(0, 1, 4) )
+ }
+
+ private def getLineitemDataSet(env: ExecutionEnvironment):
+ DataSet[Tuple4[Int, Double, Double, String]] = {
+ env.readCsvFile[Tuple4[Int, Double, Double, String]](
+ lineitemPath,
+ fieldDelimiter = '|',
+ includedFields = Array(0, 5, 6, 8) )
+ }
+
+ private def getNationDataSet(env: ExecutionEnvironment): DataSet[Tuple2[Int, String]] = {
+ env.readCsvFile[Tuple2[Int, String]](
+ nationPath,
+ fieldDelimiter = '|',
+ includedFields = Array(0, 1) )
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea4c8828/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
new file mode 100644
index 0000000..6cea953
--- /dev/null
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala.relational
+
+import org.apache.flink.api.scala._
+import org.apache.flink.util.Collector
+
+import org.apache.flink.api.java.aggregation.Aggregations
+
+/**
+ * This program implements a modified version of the TPC-H query 3. The
+ * example demonstrates how to assign names to fields by extending the Tuple class.
+ * The original query can be found at
+ * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
+ * (page 29).
+ *
+ * This program implements the following SQL equivalent:
+ *
+ * {{{
+ * SELECT
+ * l_orderkey,
+ * SUM(l_extendedprice*(1-l_discount)) AS revenue,
+ * o_orderdate,
+ * o_shippriority
+ * FROM customer,
+ * orders,
+ * lineitem
+ * WHERE
+ * c_mktsegment = '[SEGMENT]'
+ * AND c_custkey = o_custkey
+ * AND l_orderkey = o_orderkey
+ * AND o_orderdate < date '[DATE]'
+ * AND l_shipdate > date '[DATE]'
+ * GROUP BY
+ * l_orderkey,
+ * o_orderdate,
+ * o_shippriority;
+ * }}}
+ *
+ * Compared to the original TPC-H query this version does not sort the result by revenue
+ * and orderdate.
+ *
+ * Input files are plain text CSV files using the pipe character ('|') as field separator
+ * as generated by the TPC-H data generator which is available at
+ * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
+ *
+ * Usage:
+ * {{{
+ * TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>
+ * }}}
+ *
+ * This example shows how to use:
+ * - case classes and case class field addressing
+ * - build-in aggregation functions
+ *
+ */
+object TPCHQuery3 {
+
+ def main(args: Array[String]) {
+ if (!parseParameters(args)) {
+ return
+ }
+
+ // set filter date
+ val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
+ val date = dateFormat.parse("1995-03-12")
+
+ // get execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ // read and filter lineitems by shipDate
+ val lineitems = getLineitemDataSet(env).filter( l => dateFormat.parse(l.shipDate).after(date) )
+ // read and filter customers by market segment
+ val customers = getCustomerDataSet(env).filter( c => c.mktSegment.equals("AUTOMOBILE"))
+ // read orders
+ val orders = getOrdersDataSet(env)
+
+ // filter orders by order date
+ val items = orders.filter( o => dateFormat.parse(o.orderDate).before(date) )
+ // filter orders by joining with customers
+ .join(customers).where("custId").equalTo("custId").apply( (o,c) => o )
+ // join with lineitems
+ .join(lineitems).where("orderId").equalTo("orderId")
+ .apply( (o,l) =>
+ new ShippedItem( o.orderId,
+ l.extdPrice * (1.0 - l.discount),
+ o.orderDate,
+ o.shipPrio ) )
+
+ // group by order and aggregate revenue
+ val result = items.groupBy("orderId", "orderDate", "shipPrio")
+ .aggregate(Aggregations.SUM, "revenue")
+
+ // emit result
+ result.writeAsCsv(outputPath, "\n", "|")
+
+ // execute program
+ env.execute("Scala TPCH Query 3 Example")
+ }
+
+ // *************************************************************************
+ // USER DATA TYPES
+ // *************************************************************************
+
+ case class Lineitem(orderId: Integer, extdPrice: Double, discount: Double, shipDate: String)
+ case class Customer(custId: Integer, mktSegment: String)
+ case class Order(orderId: Integer, custId: Integer, orderDate: String, shipPrio: Integer)
+ case class ShippedItem(orderId: Integer, revenue: Double, orderDate: String, shipPrio: Integer)
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private var lineitemPath: String = null
+ private var customerPath: String = null
+ private var ordersPath: String = null
+ private var outputPath: String = null
+
+ private def parseParameters(args: Array[String]): Boolean = {
+ if (args.length == 4) {
+ lineitemPath = args(0)
+ customerPath = args(1)
+ ordersPath = args(2)
+ outputPath = args(3)
+ true
+ } else {
+ System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
+ " Due to legal restrictions, we can not ship generated data.\n" +
+ " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
+ " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" +
+ "<orders-csv path> <result path>");
+ false
+ }
+ }
+
+ private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = {
+ env.readCsvFile[Lineitem](
+ lineitemPath,
+ fieldDelimiter = '|',
+ includedFields = Array(0, 5, 6, 10) )
+ }
+
+ private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = {
+ env.readCsvFile[Customer](
+ customerPath,
+ fieldDelimiter = '|',
+ includedFields = Array(0, 6) )
+ }
+
+ private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
+ env.readCsvFile[Order](
+ ordersPath,
+ fieldDelimiter = '|',
+ includedFields = Array(0, 1, 4, 7) )
+ }
+
+}