You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/09/29 22:58:39 UTC

git commit: Added TPCH query examples for Scala API Improved Java TPCH query examples Removed RelationalQuery Java API example

Repository: incubator-flink
Updated Branches:
  refs/heads/master 2b4d77929 -> ea4c8828c


Added TPCH query examples for Scala API
Improved Java TPCH query examples
Removed RelationalQuery Java API example


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ea4c8828
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ea4c8828
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ea4c8828

Branch: refs/heads/master
Commit: ea4c8828c0d0170ccfe493494f06e6eb86ffdaf7
Parents: 2b4d779
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Sep 29 17:53:39 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Sep 29 22:13:07 2014 +0200

----------------------------------------------------------------------
 .../java/relational/RelationalQuery.java        | 171 -----------------
 .../examples/java/relational/TPCHQuery10.java   |  55 +++---
 .../examples/java/relational/TPCHQuery3.java    | 150 +++++++--------
 .../examples/scala/relational/TPCHQuery10.scala | 184 +++++++++++++++++++
 .../examples/scala/relational/TPCHQuery3.scala  | 172 +++++++++++++++++
 5 files changed, 440 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea4c8828/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/RelationalQuery.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/RelationalQuery.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/RelationalQuery.java
deleted file mode 100644
index e54bb99..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/RelationalQuery.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.relational;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * This program implements the following relational query on the TPC-H data set.
- * 
- * <p>
- * <code><pre>
- * SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
- *   FROM orders, lineitem
- *   WHERE l_orderkey = o_orderkey
- *     AND o_orderstatus = "X"
- *     AND YEAR(o_orderdate) > Y
- *     AND o_orderpriority LIKE "Z%"
- *   GROUP BY l_orderkey, o_shippriority;
- * </pre></code>
- *        
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator 
- * as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
- * 
- * <p>
- * Usage: <code>RelationalQuery &lt;orders-csv path&gt; &lt;lineitem-csv path&gt; &lt;result path&gt;</code><br>
- *  
- * <p>
- * This example shows how to use:
- * <ul>
- * <li> tuple data types
- * <li> inline-defined functions
- * <li> projection and join projection
- * <li> build-in aggregation functions
- * </ul>
- */
-@SuppressWarnings("serial")
-public class RelationalQuery {
-	
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-	
-	private static String STATUS_FILTER = "F";
-	private static int YEAR_FILTER = 1993;
-	private static String OPRIO_FILTER = "5";
-	
-	public static void main(String[] args) throws Exception {
-		
-		if(!parseParameters(args)) {
-			return;
-		}
-		
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)
-		DataSet<Tuple5<Integer, String, String, String, Integer>> orders = getOrdersDataSet(env);
-
-		// get lineitem data set: (orderkey, extendedprice)
-		DataSet<Tuple2<Integer, Double>> lineitems = getLineitemDataSet(env);
-
-		// orders filtered by year: (orderkey, custkey)
-		DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
-				// filter orders
-				orders.filter(
-								new FilterFunction<Tuple5<Integer, String, String, String, Integer>>() {
-									@Override
-									public boolean filter(Tuple5<Integer, String, String, String, Integer> t) {
-										// status filter
-										if(!t.f1.equals(STATUS_FILTER)) {
-											return false;
-										// year filter
-										} else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) {
-											return false;
-										// order priority filter
-										} else if(!t.f3.startsWith(OPRIO_FILTER)) {
-											return false;
-										}
-										return true;
-									}
-								})
-				// project fields out that are no longer required
-				.project(0,4).types(Integer.class, Integer.class);
-
-		// join orders with lineitems: (orderkey, shippriority, extendedprice)
-		DataSet<Tuple3<Integer, Integer, Double>> lineitemsOfOrders = 
-				ordersFilteredByYear.joinWithHuge(lineitems)
-									.where(0).equalTo(0)
-									.projectFirst(0,1).projectSecond(1)
-									.types(Integer.class, Integer.class, Double.class);
-
-		// extendedprice sums: (orderkey, shippriority, sum(extendedprice))
-		DataSet<Tuple3<Integer, Integer, Double>> priceSums =
-				// group by order and sum extendedprice
-				lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2);
-
-		// emit result
-		priceSums.writeAsCsv(outputPath);
-		
-		// execute program
-		env.execute("Relational Query Example");
-		
-	}
-	
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-	
-	private static String ordersPath;
-	private static String lineitemPath;
-	private static String outputPath;
-	
-	private static boolean parseParameters(String[] programArguments) {
-		
-		if(programArguments.length > 0) {
-			if(programArguments.length == 3) {
-				ordersPath = programArguments[0];
-				lineitemPath = programArguments[1];
-				outputPath = programArguments[2];
-			} else {
-				System.err.println("Usage: RelationalQuery <orders-csv path> <lineitem-csv path> <result path>");
-				return false;
-			}
-		} else {
-			System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
-								"  Due to legal restrictions, we can not ship generated data.\n" +
-								"  You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + 
-								"  Usage: RelationalQuery <orders-csv path> <lineitem-csv path> <result path>");
-			return false;
-		}
-		return true;
-	}
-	
-	private static DataSet<Tuple5<Integer, String, String, String, Integer>> getOrdersDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(ordersPath)
-					.fieldDelimiter('|')
-					.includeFields("101011010")
-					.types(Integer.class, String.class, String.class, String.class, Integer.class);
-	}
-
-	private static DataSet<Tuple2<Integer, Double>> getLineitemDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(lineitemPath)
-					.fieldDelimiter('|')
-					.includeFields("1000010000000000")
-					.types(Integer.class, Double.class);
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea4c8828/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
index ff10b74..d3ba818 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
@@ -104,13 +104,10 @@ public class TPCHQuery10 {
 
 		// get customer data set: (custkey, name, address, nationkey, acctbal) 
 		DataSet<Tuple5<Integer, String, String, Integer, Double>> customers = getCustomerDataSet(env);
-
 		// get orders data set: (orderkey, custkey, orderdate)
 		DataSet<Tuple3<Integer, Integer, String>> orders = getOrdersDataSet(env);
-
 		// get lineitem data set: (orderkey, extendedprice, discount, returnflag)
 		DataSet<Tuple4<Integer, Double, Double, String>> lineitems = getLineitemDataSet(env);
-
 		// get nation data set: (nationkey, name)
 		DataSet<Tuple2<Integer, String>> nations = getNationsDataSet(env);
 
@@ -120,46 +117,38 @@ public class TPCHQuery10 {
 				orders.filter(
 								new FilterFunction<Tuple3<Integer,Integer, String>>() {
 									@Override
-									public boolean filter(Tuple3<Integer, Integer, String> t) {
-										int year = Integer.parseInt(t.f2.substring(0, 4));
-										return year > 1990;
+									public boolean filter(Tuple3<Integer, Integer, String> o) {
+										return Integer.parseInt(o.f2.substring(0, 4)) > 1990;
 									}
 								})
 				// project fields out that are no longer required
 				.project(0,1).types(Integer.class, Integer.class);
 
-		// lineitems filtered by flag: (orderkey, extendedprice, discount)
-		DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag = 
+		// lineitems filtered by flag: (orderkey, revenue)
+		DataSet<Tuple2<Integer, Double>> lineitemsFilteredByFlag = 
 				// filter by flag
 				lineitems.filter(new FilterFunction<Tuple4<Integer, Double, Double, String>>() {
 										@Override
-										public boolean filter(Tuple4<Integer, Double, Double, String> t)
-												throws Exception {
-											return t.f3.equals("R");
+										public boolean filter(Tuple4<Integer, Double, Double, String> l) {
+											return l.f3.equals("R");
 										}
 								})
-				// project fields out that are no longer required
-				.project(0,1,2).types(Integer.class, Double.class, Double.class);
-
-		// join orders with lineitems: (custkey, extendedprice, discount)
-		DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey = 
-				ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
-									.where(0).equalTo(0)
-									.projectFirst(1).projectSecond(1,2)
-									.types(Integer.class, Double.class, Double.class);
-
-		// aggregate for revenue: (custkey, revenue)
-		DataSet<Tuple2<Integer, Double>> revenueOfCustomerKey = lineitemsOfCustomerKey
-				// calculate the revenue for each item
-				.map(new MapFunction<Tuple3<Integer, Double, Double>, Tuple2<Integer, Double>>() {
+				// compute revenue and project out return flag
+				.map(new MapFunction<Tuple4<Integer, Double, Double, String>, Tuple2<Integer, Double>>() {
 							@Override
-							public Tuple2<Integer, Double> map(Tuple3<Integer, Double, Double> t) {
+							public Tuple2<Integer, Double> map(Tuple4<Integer, Double, Double, String> l) {
 								// revenue per item = l_extendedprice * (1 - l_discount)
-								return new Tuple2<Integer, Double>(t.f0, t.f1 * (1 - t.f2));
+								return new Tuple2<Integer, Double>(l.f0, l.f1 * (1 - l.f2));
 							}
-					})
-				// aggregate the revenues per item to revenue per customer
-				.groupBy(0).aggregate(Aggregations.SUM, 1);
+					});
+
+		// join orders with lineitems: (custkey, revenue)
+		DataSet<Tuple2<Integer, Double>> revenueByCustomer = 
+				ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
+									.where(0).equalTo(0)
+									.projectFirst(1).projectSecond(1)
+									.types(Integer.class, Double.class)
+									.groupBy(0).aggregate(Aggregations.SUM, 1);
 
 		// join customer with nation (custkey, name, address, nationname, acctbal)
 		DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
@@ -169,14 +158,14 @@ public class TPCHQuery10 {
 						.types(Integer.class, String.class, String.class, String.class, Double.class);
 
 		// join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
-		DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue = 
-				customerWithNation.join(revenueOfCustomerKey)
+		DataSet<Tuple6<Integer, String, String, String, Double, Double>> result = 
+				customerWithNation.join(revenueByCustomer)
 				.where(0).equalTo(0)
 				.projectFirst(0,1,2,3,4).projectSecond(1)
 				.types(Integer.class, String.class, String.class, String.class, Double.class, Double.class);
 
 		// emit result
-		customerWithRevenue.writeAsCsv(outputPath);
+		result.writeAsCsv(outputPath, "\n", "|");
 		
 		// execute program
 		env.execute("TPCH Query 10 Example");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea4c8828/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
index 9402008..c10147c 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
@@ -22,19 +22,15 @@ package org.apache.flink.examples.java.relational;
 import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.Calendar;
 import java.util.Date;
 
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 
 /**
  * This program implements a modified version of the TPC-H query 3. The
@@ -102,93 +98,72 @@ public class TPCHQuery3 {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		// get input data
-		DataSet<Lineitem> li = getLineitemDataSet(env);
-		DataSet<Order> or = getOrdersDataSet(env);
-		DataSet<Customer> cust = getCustomerDataSet(env);
+		DataSet<Lineitem> lineitems = getLineitemDataSet(env);
+		DataSet<Order> orders = getOrdersDataSet(env);
+		DataSet<Customer> customers = getCustomerDataSet(env);
 		
 		// Filter market segment "AUTOMOBILE"
-		cust = cust.filter(
-							new FilterFunction<Customer>() {
+		customers = customers.filter(
+								new FilterFunction<Customer>() {
+									@Override
+									public boolean filter(Customer c) {
+										return c.getMktsegment().equals("AUTOMOBILE");
+									}
+								});
+
+		// Filter all Orders with o_orderdate < 12.03.1995
+		orders = orders.filter(
+							new FilterFunction<Order>() {
+								private final DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+								private final Date date = format.parse("1995-03-12");
+								
 								@Override
-								public boolean filter(Customer value) {
-									return value.getMktsegment().equals("AUTOMOBILE");
+								public boolean filter(Order o) throws ParseException {
+									return format.parse(o.getOrderdate()).before(date);
 								}
 							});
-
-		// Filter all Orders with o_orderdate < 12.03.1995
-		or = or.filter(
-						new FilterFunction<Order>() {
-							private DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
-							private Date date;
-							
-							{	
-								Calendar cal = Calendar.getInstance();
-								cal.set(1995, 3, 12);
-								date = cal.getTime(); 
-							}
-							
-							@Override
-							public boolean filter(Order value) throws ParseException {
-								Date orderDate = format.parse(value.getOrderdate());
-								return orderDate.before(date);
-							}
-						});
 		
 		// Filter all Lineitems with l_shipdate > 12.03.1995
-		li = li.filter(
-						new FilterFunction<Lineitem>() {
-							private DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
-							private Date date;
-							
-							{
-								Calendar cal = Calendar.getInstance();
-								cal.set(1995, 3, 12);
-								date = cal.getTime();
-							}
-							
-							@Override
-							public boolean filter(Lineitem value) throws ParseException {
-								Date shipDate = format.parse(value.getShipdate());
-								return shipDate.after(date);
-							}
-						});
+		lineitems = lineitems.filter(
+								new FilterFunction<Lineitem>() {
+									private final DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+									private final Date date = format.parse("1995-03-12");
+									
+									@Override
+									public boolean filter(Lineitem l) throws ParseException {
+										return format.parse(l.getShipdate()).after(date);
+									}
+								});
 
 		// Join customers with orders and package them into a ShippingPriorityItem
 		DataSet<ShippingPriorityItem> customerWithOrders = 
-				cust.join(or)
-					.where(0)
-					.equalTo(0)
-					.with(
-							new JoinFunction<Customer, Order, ShippingPriorityItem>() {
-								@Override
-								public ShippingPriorityItem join(Customer first, Order second) {
-									return new ShippingPriorityItem(0, 0.0, second.getOrderdate(),
-											second.getShippriority(), second.getOrderkey());
-								}
-							});
+				customers.join(orders).where(0).equalTo(1)
+							.with(
+								new JoinFunction<Customer, Order, ShippingPriorityItem>() {
+									@Override
+									public ShippingPriorityItem join(Customer c, Order o) {
+										return new ShippingPriorityItem(o.getOrderKey(), 0.0, o.getOrderdate(),
+												o.getShippriority());
+									}
+								});
 		
 		// Join the last join result with Lineitems
-		DataSet<ShippingPriorityItem> joined = 
-				customerWithOrders.join(li)
-									.where(4)
-									.equalTo(0)
+		DataSet<ShippingPriorityItem> result = 
+				customerWithOrders.join(lineitems).where(0).equalTo(0)
 									.with(
 											new JoinFunction<ShippingPriorityItem, Lineitem, ShippingPriorityItem>() {
 												@Override
-												public ShippingPriorityItem join(ShippingPriorityItem first, Lineitem second) {
-													first.setL_Orderkey(second.getOrderkey());
-													first.setRevenue(second.getExtendedprice() * (1 - second.getDiscount()));
-													return first;
+												public ShippingPriorityItem join(ShippingPriorityItem i, Lineitem l) {
+													i.setRevenue(l.getExtendedprice() * (1 - l.getDiscount()));
+													return i;
 												}
-											});
-		
-		// Group by l_orderkey, o_orderdate and o_shippriority and compute revenue sum
-		joined = joined
-				.groupBy(0, 2, 3)
-				.aggregate(Aggregations.SUM, 1);
+											})
+								// Group by l_orderkey, o_orderdate and o_shippriority and compute revenue sum
+								.groupBy(0, 2, 3)
+								.aggregate(Aggregations.SUM, 1);
 		
 		// emit result
-		joined.writeAsCsv(outputPath, "\n", "|");
+		result.writeAsCsv(outputPath, "\n", "|");
 		
 		// execute program
 		env.execute("TPCH Query 3 Example");
@@ -213,34 +188,33 @@ public class TPCHQuery3 {
 		public String getMktsegment() { return this.f1; }
 	}
 
-	public static class Order extends Tuple3<Integer, String, Integer> {
+	public static class Order extends Tuple4<Integer, Integer, String, Integer> {
 		
-		public Integer getOrderkey() { return this.f0; }
-		public String getOrderdate() { return this.f1; }
-		public Integer getShippriority() { return this.f2; }
+		public Integer getOrderKey() { return this.f0; }
+		public Integer getCustKey() { return this.f1; }
+		public String getOrderdate() { return this.f2; }
+		public Integer getShippriority() { return this.f3; }
 	}
 
-	public static class ShippingPriorityItem extends Tuple5<Integer, Double, String, Integer, Integer> {
+	public static class ShippingPriorityItem extends Tuple4<Integer, Double, String, Integer> {
 
 		public ShippingPriorityItem() { }
 
-		public ShippingPriorityItem(Integer l_orderkey, Double revenue,
-				String o_orderdate, Integer o_shippriority, Integer o_orderkey) {
-			this.f0 = l_orderkey;
+		public ShippingPriorityItem(Integer o_orderkey, Double revenue,
+				String o_orderdate, Integer o_shippriority) {
+			this.f0 = o_orderkey;
 			this.f1 = revenue;
 			this.f2 = o_orderdate;
 			this.f3 = o_shippriority;
-			this.f4 = o_orderkey;
 		}
 		
-		public Integer getL_Orderkey() { return this.f0; }
-		public void setL_Orderkey(Integer l_orderkey) { this.f0 = l_orderkey; }
+		public Integer getOrderkey() { return this.f0; }
+		public void setOrderkey(Integer orderkey) { this.f0 = orderkey; }
 		public Double getRevenue() { return this.f1; }
 		public void setRevenue(Double revenue) { this.f1 = revenue; }
 		
 		public String getOrderdate() { return this.f2; }
 		public Integer getShippriority() { return this.f3; }
-		public Integer getO_Orderkey() { return this.f4; }
 	}
 	
 	// *************************************************************************
@@ -291,7 +265,7 @@ public class TPCHQuery3 {
 	private static DataSet<Order> getOrdersDataSet(ExecutionEnvironment env) {
 		return env.readCsvFile(ordersPath)
 					.fieldDelimiter('|')
-					.includeFields("100010010")
+					.includeFields("110010010")
 					.tupleType(Order.class);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea4c8828/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
new file mode 100644
index 0000000..d83c2fb
--- /dev/null
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala.relational
+
+import org.apache.flink.api.scala._
+import org.apache.flink.util.Collector
+
+import org.apache.flink.api.java.aggregation.Aggregations
+
+/**
+ * This program implements a modified version of the TPC-H query 10. 
+ * 
+ * The original query can be found at
+ * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
+ * (page 45).
+ *
+ * This program implements the following SQL equivalent:
+ *
+ * {{{
+ * SELECT 
+ *        c_custkey,
+ *        c_name, 
+ *        c_address,
+ *        n_name, 
+ *        c_acctbal
+ *        SUM(l_extendedprice * (1 - l_discount)) AS revenue,  
+ * FROM   
+ *        customer, 
+ *        orders, 
+ *        lineitem, 
+ *        nation 
+ * WHERE 
+ *        c_custkey = o_custkey 
+ *        AND l_orderkey = o_orderkey 
+ *        AND YEAR(o_orderdate) > '1990' 
+ *        AND l_returnflag = 'R' 
+ *        AND c_nationkey = n_nationkey 
+ * GROUP BY 
+ *        c_custkey, 
+ *        c_name, 
+ *        c_acctbal, 
+ *        n_name, 
+ *        c_address
+ * }}}
+ *
+ * Compared to the original TPC-H query this version does not print 
+ * c_phone and c_comment, only filters by years greater than 1990 instead of
+ * a period of 3 months, and does not sort the result by revenue..
+ *
+ * Input files are plain text CSV files using the pipe character ('|') as field separator 
+ * as generated by the TPC-H data generator which is available at 
+ * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
+ *
+ * Usage: 
+ * {{{
+ *TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation path> <result path>
+ * }}}
+ *  
+ * This example shows how to use:
+ *  - tuple data types
+ *  - build-in aggregation functions
+ *  - join with size hints
+ *  
+ */
+object TPCHQuery10 {
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    // get execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // get customer data set: (custkey, name, address, nationkey, acctbal) 
+    val customers = getCustomerDataSet(env)
+    // get orders data set: (orderkey, custkey, orderdate)
+    val orders = getOrdersDataSet(env)
+    // get lineitem data set: (orderkey, extendedprice, discount, returnflag)
+    val lineitems = getLineitemDataSet(env)
+    // get nation data set: (nationkey, name)    
+    val nations = getNationDataSet(env)
+
+    // filter orders by years
+    val orders1990 = orders.filter( o => o._3.substring(0,4).toInt > 1990)
+                           .map( o => (o._1, o._2))
+    
+    // filter lineitems by return status
+    val lineitemsReturn = lineitems.filter( l => l._4.equals("R"))
+                                   .map( l => (l._1, l._2 * (1 - l._3)) )
+
+    // compute revenue by customer
+    val revenueByCustomer = orders1990.joinWithHuge(lineitemsReturn).where(0).equalTo(0)
+                                        .apply( (o,l) => (o._2, l._2) )
+                                      .groupBy(0)
+                                      .aggregate(Aggregations.SUM, 1)
+
+    // compute final result by joining customer and nation information with revenue
+    val result = customers.joinWithTiny(nations).where(3).equalTo(0)
+                            .apply( (c, n) => (c._1, c._2, c._3, n._2, c._5) )
+                          .join(revenueByCustomer).where(0).equalTo(0)
+                            .apply( (c, r) => (c._1, c._2, c._3, c._4, c._5, r._2) )
+    // emit result
+    result.writeAsCsv(outputPath, "\n", "|")
+
+    // execute program
+    env.execute("Scala TPCH Query 10 Example")
+  }
+  
+  
+  // *************************************************************************
+  //     UTIL METHODS
+  // *************************************************************************
+  
+  private var customerPath: String = null
+  private var ordersPath: String = null
+  private var lineitemPath: String = null
+  private var nationPath: String = null
+  private var outputPath: String = null
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length == 5) {
+      customerPath = args(0)
+      ordersPath = args(1)
+      lineitemPath = args(2)
+      nationPath = args(3)
+      outputPath = args(4)
+      true
+    } else {
+      System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
+          "  Due to legal restrictions, we can not ship generated data.\n" +
+          "  You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
+          "  Usage: TPCHQuery10 <customer-csv path> <orders-csv path> " + 
+                                "<lineitem-csv path> <nation-csv path> <result path>");
+      false
+    }
+  }
+  
+  private def getCustomerDataSet(env: ExecutionEnvironment): 
+                         DataSet[Tuple5[Int, String, String, Int, Double]] = {
+    env.readCsvFile[Tuple5[Int, String, String, Int, Double]](
+        customerPath,
+        fieldDelimiter = '|',
+        includedFields = Array(0,1,2,3,5) )
+  }
+  
+  private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Tuple3[Int, Int, String]] = {
+    env.readCsvFile[Tuple3[Int, Int, String]](
+        ordersPath,
+        fieldDelimiter = '|',
+        includedFields = Array(0, 1, 4) )
+  }
+  
+  private def getLineitemDataSet(env: ExecutionEnvironment):
+                         DataSet[Tuple4[Int, Double, Double, String]] = {
+    env.readCsvFile[Tuple4[Int, Double, Double, String]](
+        lineitemPath,
+        fieldDelimiter = '|',
+        includedFields = Array(0, 5, 6, 8) )
+  }
+
+  private def getNationDataSet(env: ExecutionEnvironment): DataSet[Tuple2[Int, String]] = {
+    env.readCsvFile[Tuple2[Int, String]](
+        nationPath,
+        fieldDelimiter = '|',
+        includedFields = Array(0, 1) )
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea4c8828/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
new file mode 100644
index 0000000..6cea953
--- /dev/null
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala.relational
+
+import org.apache.flink.api.scala._
+import org.apache.flink.util.Collector
+
+import org.apache.flink.api.java.aggregation.Aggregations
+
+/**
+ * This program implements a modified version of the TPC-H query 3. The
+ * example demonstrates how to assign names to fields by extending the Tuple class.
+ * The original query can be found at
+ * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
+ * (page 29).
+ *
+ * This program implements the following SQL equivalent:
+ *
+ * {{{
+ * SELECT 
+ *      l_orderkey, 
+ *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
+ *      o_orderdate, 
+ *      o_shippriority 
+ * FROM customer, 
+ *      orders, 
+ *      lineitem 
+ * WHERE
+ *      c_mktsegment = '[SEGMENT]' 
+ *      AND c_custkey = o_custkey
+ *      AND l_orderkey = o_orderkey
+ *      AND o_orderdate < date '[DATE]'
+ *      AND l_shipdate > date '[DATE]'
+ * GROUP BY
+ *      l_orderkey, 
+ *      o_orderdate, 
+ *      o_shippriority;
+ * }}}
+ *
+ * Compared to the original TPC-H query this version does not sort the result by revenue
+ * and orderdate.
+ *
+ * Input files are plain text CSV files using the pipe character ('|') as field separator 
+ * as generated by the TPC-H data generator which is available at 
+ * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
+ *
+ * Usage: 
+ * {{{
+ * TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>
+ * }}}
+ *  
+ * This example shows how to use:
+ *  - case classes and case class field addressing
+ *  - build-in aggregation functions
+ * 
+ */
+object TPCHQuery3 {
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    // set filter date
+    val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
+    val date = dateFormat.parse("1995-03-12")
+    
+    // get execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // read and filter lineitems by shipDate
+    val lineitems = getLineitemDataSet(env).filter( l => dateFormat.parse(l.shipDate).after(date) )
+    // read and filter customers by market segment
+    val customers = getCustomerDataSet(env).filter( c => c.mktSegment.equals("AUTOMOBILE"))
+    // read orders
+    val orders = getOrdersDataSet(env)
+
+                      // filter orders by order date
+    val items = orders.filter( o => dateFormat.parse(o.orderDate).before(date) )
+                      // filter orders by joining with customers
+                      .join(customers).where("custId").equalTo("custId").apply( (o,c) => o )
+                      // join with lineitems 
+                      .join(lineitems).where("orderId").equalTo("orderId")
+                                      .apply( (o,l) => 
+                                        new ShippedItem( o.orderId,
+                                                         l.extdPrice * (1.0 - l.discount),
+                                                         o.orderDate,
+                                                         o.shipPrio ) )
+
+    // group by order and aggregate revenue
+    val result = items.groupBy("orderId", "orderDate", "shipPrio")
+                      .aggregate(Aggregations.SUM, "revenue")
+
+    // emit result
+    result.writeAsCsv(outputPath, "\n", "|")
+    
+    // execute program
+    env.execute("Scala TPCH Query 3 Example")
+  }
+  
+  // *************************************************************************
+  //     USER DATA TYPES
+  // *************************************************************************
+  
+  case class Lineitem(orderId: Integer, extdPrice: Double, discount: Double, shipDate: String)
+  case class Customer(custId: Integer, mktSegment: String)
+  case class Order(orderId: Integer, custId: Integer, orderDate: String, shipPrio: Integer)
+  case class ShippedItem(orderId: Integer, revenue: Double, orderDate: String, shipPrio: Integer)
+
+  // *************************************************************************
+  //     UTIL METHODS
+  // *************************************************************************
+  
+  private var lineitemPath: String = null
+  private var customerPath: String = null
+  private var ordersPath: String = null
+  private var outputPath: String = null
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length == 4) {
+      lineitemPath = args(0)
+      customerPath = args(1)
+      ordersPath = args(2)
+      outputPath = args(3)
+      true
+    } else {
+      System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
+          " Due to legal restrictions, we can not ship generated data.\n" +
+          " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
+          " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + 
+                             "<orders-csv path> <result path>");
+      false
+    }
+  }
+  
+  private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = {
+    env.readCsvFile[Lineitem](
+        lineitemPath,
+        fieldDelimiter = '|',
+        includedFields = Array(0, 5, 6, 10) )
+  }
+
+  private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = {
+    env.readCsvFile[Customer](
+        customerPath,
+        fieldDelimiter = '|',
+        includedFields = Array(0, 6) )
+  }
+  
+  private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
+    env.readCsvFile[Order](
+        ordersPath,
+        fieldDelimiter = '|',
+        includedFields = Array(0, 1, 4, 7) )
+  }
+  
+}