You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/08/02 14:25:45 UTC

[1/2] flink git commit: [FLINK-4180] [FLINK-4181] [table] Ensure examples consistency

Repository: flink
Updated Branches:
  refs/heads/master 45df1b25b -> a1fef27b9


[FLINK-4180] [FLINK-4181] [table] Ensure examples consistency


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1fef27b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1fef27b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1fef27b

Branch: refs/heads/master
Commit: a1fef27b9c9ddd03b3de9bfe35f1d9e7d973c756
Parents: 123c637e
Author: twalthr <tw...@apache.org>
Authored: Tue Aug 2 16:21:45 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Aug 2 16:24:59 2016 +0200

----------------------------------------------------------------------
 .../flink/examples/java/JavaSQLExample.java     | 72 ----------------
 .../flink/examples/java/JavaTableExample.java   | 71 ----------------
 .../flink/examples/java/WordCountSQL.java       | 87 ++++++++++++++++++++
 .../flink/examples/java/WordCountTable.java     | 85 +++++++++++++++++++
 .../flink/examples/scala/StreamSQLExample.scala | 22 ++++-
 .../examples/scala/StreamTableExample.scala     | 18 +++-
 .../flink/examples/scala/TPCHQuery3Table.scala  |  7 +-
 .../flink/examples/scala/WordCountSQL.scala     | 23 +++++-
 .../flink/examples/scala/WordCountTable.scala   | 23 +++++-
 .../java/batch/table/AggregationsITCase.java    |  6 +-
 .../scala/batch/table/AggregationsITCase.scala  |  6 +-
 11 files changed, 256 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java
deleted file mode 100644
index bbac94a..0000000
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.java;
-
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.table.TableEnvironment;
-
-/**
- * Simple example that shows how the Batch SQL used in Java.
- */
-public class JavaSQLExample {
-
-	public static class WC {
-		public String word;
-		public long frequency;
-
-		// Public constructor to make it a Flink POJO
-		public WC() {
-
-		}
-
-		public WC(String word, long frequency) {
-			this.word = word;
-			this.frequency = frequency;
-		}
-
-		@Override
-		public String toString() {
-			return "WC " + word + " " + frequency;
-		}
-	}
-
-	public static void main(String[] args) throws Exception {
-
-		// set up execution environment
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<WC> input = env.fromElements(
-			new WC("Hello", 1),
-			new WC("Ciao", 1),
-			new WC("Hello", 1));
-
-		// register the DataSet as table "WordCount"
-		tableEnv.registerDataSet("WordCount", input, "word, frequency");
-		// run a SQL query on the Table and retrieve the result as a new Table
-		Table table = tableEnv.sql(
-			"SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
-
-		DataSet<WC> result = tableEnv.toDataSet(table, WC.class);
-
-		result.print();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
deleted file mode 100644
index 0a776a4..0000000
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.java;
-
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.table.TableEnvironment;
-
-/**
- * Very simple example that shows how the Java Table API can be used.
- */
-public class JavaTableExample {
-
-	public static class WC {
-		public String word;
-		public long count;
-
-		// Public constructor to make it a Flink POJO
-		public WC() {
-
-		}
-
-		public WC(String word, long count) {
-			this.word = word;
-			this.count = count;
-		}
-
-		@Override
-		public String toString() {
-			return "WC " + word + " " + count;
-		}
-	}
-
-	public static void main(String[] args) throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<WC> input = env.fromElements(
-				new WC("Hello", 1),
-				new WC("Ciao", 1),
-				new WC("Hello", 1));
-
-		Table table = tableEnv.fromDataSet(input);
-
-		Table filtered = table
-				.groupBy("word")
-				.select("word.count as count, word")
-				.filter("count = 2");
-
-		DataSet<WC> result = tableEnv.toDataSet(filtered, WC.class);
-
-		result.print();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/WordCountSQL.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/WordCountSQL.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/WordCountSQL.java
new file mode 100644
index 0000000..c9043d1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/WordCountSQL.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.java;
+
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.table.TableEnvironment;
+
+/**
+ * Simple example that shows how the Batch SQL API is used in Java.
+ *
+ * This example shows how to:
+ *  - Convert DataSets to Tables
+ *  - Register a Table under a name
+ *  - Run a SQL query on the registered Table
+ *
+ */
+public class WordCountSQL {
+
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		// set up execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<WC> input = env.fromElements(
+			new WC("Hello", 1),
+			new WC("Ciao", 1),
+			new WC("Hello", 1));
+
+		// register the DataSet as table "WordCount"
+		tEnv.registerDataSet("WordCount", input, "word, frequency");
+
+		// run a SQL query on the Table and retrieve the result as a new Table
+		Table table = tEnv.sql(
+			"SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
+
+		DataSet<WC> result = tEnv.toDataSet(table, WC.class);
+
+		result.print();
+	}
+
+	// *************************************************************************
+	//     USER DATA TYPES
+	// *************************************************************************
+
+	public static class WC {
+		public String word;
+		public long frequency;
+
+		// public constructor to make it a Flink POJO
+		public WC() {
+		}
+
+		public WC(String word, long frequency) {
+			this.word = word;
+			this.frequency = frequency;
+		}
+
+		@Override
+		public String toString() {
+			return "WC " + word + " " + frequency;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/WordCountTable.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/WordCountTable.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/WordCountTable.java
new file mode 100644
index 0000000..c339cf1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/WordCountTable.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.java;
+
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.table.TableEnvironment;
+
+/**
+  * Simple example for demonstrating the use of the Table API for a Word Count in Java.
+  *
+  * This example shows how to:
+  *  - Convert DataSets to Tables
+  *  - Apply group, aggregate, select, and filter operations
+  *
+  */
+public class WordCountTable {
+
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
+		BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<WC> input = env.fromElements(
+				new WC("Hello", 1),
+				new WC("Ciao", 1),
+				new WC("Hello", 1));
+
+		Table table = tEnv.fromDataSet(input);
+
+		Table filtered = table
+				.groupBy("word")
+				.select("word, frequency.sum as frequency")
+				.filter("frequency = 2");
+
+		DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);
+
+		result.print();
+	}
+
+	// *************************************************************************
+	//     USER DATA TYPES
+	// *************************************************************************
+
+	public static class WC {
+		public String word;
+		public long frequency;
+
+		// public constructor to make it a Flink POJO
+		public WC() {
+
+		}
+
+		public WC(String word, long frequency) {
+			this.word = word;
+			this.frequency = frequency;
+		}
+
+		@Override
+		public String toString() {
+			return "WC " + word + " " + frequency;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
index 8eed77d..5e0c773 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
@@ -23,11 +23,19 @@ import org.apache.flink.api.table.TableEnvironment
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 
 /**
-  * Simple example for demonstrating the use of SQL on Stream Table.
+  * Simple example for demonstrating the use of SQL on a Stream Table.
+  *
+  * This example shows how to:
+  *  - Convert DataStreams to Tables
+  *  - Register a Table under a name
+  *  - Run a StreamSQL query on the registered Table
+  *
   */
 object StreamSQLExample {
 
-  case class Order(user: Long, product: String, amount: Int)
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
 
   def main(args: Array[String]): Unit = {
 
@@ -45,11 +53,11 @@ object StreamSQLExample {
       Order(2L, "rubber", 3),
       Order(4L, "beer", 1)))
 
-    // register the DataStream under the name "OrderA" and "OrderB"
+    // register the DataStreams under the name "OrderA" and "OrderB"
     tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount)
     tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
 
-    // Union two tables
+    // union the two tables
     val result = tEnv.sql(
       "SELECT STREAM * FROM OrderA WHERE amount > 2 UNION ALL " +
         "SELECT STREAM * FROM OrderB WHERE amount < 2")
@@ -59,4 +67,10 @@ object StreamSQLExample {
     env.execute()
   }
 
+	// *************************************************************************
+	//     USER DATA TYPES
+	// *************************************************************************
+
+  case class Order(user: Long, product: String, amount: Int)
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
index 9081f50..2ce2684 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
@@ -23,11 +23,18 @@ import org.apache.flink.api.table.TableEnvironment
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 
 /**
-  * Simple example for demonstrating the use of Table API on Stream Table.
+  * Simple example for demonstrating the use of Table API on a Stream Table.
+  *
+  * This example shows how to:
+  *  - Convert DataStreams to Tables
+  *  - Apply union, select, and filter operations
+  *
   */
 object StreamTableExample {
 
-  case class Order(user: Long, product: String, amount: Int)
+  // *************************************************************************
+  //     PROGRAM
+  // *************************************************************************
 
   def main(args: Array[String]): Unit = {
 
@@ -45,6 +52,7 @@ object StreamTableExample {
       Order(2L, "rubber", 3),
       Order(4L, "beer", 1))).toTable(tEnv)
 
+    // union the two tables
     val result: DataStream[Order] = orderA.unionAll(orderB)
       .select('user, 'product, 'amount)
       .where('amount > 2)
@@ -55,4 +63,10 @@ object StreamTableExample {
     env.execute()
   }
 
+  // *************************************************************************
+  //     USER DATA TYPES
+  // *************************************************************************
+
+  case class Order(user: Long, product: String, amount: Int)
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
index a761f4f..a950988 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
@@ -54,9 +54,6 @@ import org.apache.flink.api.table.TableEnvironment
   *      o_orderdate;
   * }}}
   *
-  * Compared to the original TPC-H query this version does not sort the result by revenue
-  * and orderdate.
-  *
   * Input files are plain text CSV files using the pipe character ('|') as field separator
   * as generated by the TPC-H data generator which is available at
   * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
@@ -73,6 +70,10 @@ import org.apache.flink.api.table.TableEnvironment
   */
 object TPCHQuery3Table {
 
+  // *************************************************************************
+  //     PROGRAM
+  // *************************************************************************
+
   def main(args: Array[String]) {
     if (!parseParameters(args)) {
       return

http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
index 41efffc..96a603e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
@@ -22,10 +22,19 @@ import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.TableEnvironment
 
 /**
-  * Simple example that shows how the Batch SQL used in Scala.
+  * Simple example that shows how the Batch SQL API is used in Scala.
+  *
+  * This example shows how to:
+  *  - Convert DataSets to Tables
+  *  - Register a Table under a name
+  *  - Run a SQL query on the registered Table
+  *
   */
 object WordCountSQL {
-  case class WC(word: String, count: Int)
+
+  // *************************************************************************
+  //     PROGRAM
+  // *************************************************************************
 
   def main(args: Array[String]): Unit = {
 
@@ -34,10 +43,20 @@ object WordCountSQL {
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
     val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
+
+    // register the DataSet as table "WordCount"
     tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
 
+    // run a SQL query on the Table and retrieve the result as a new Table
     val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
 
     table.toDataSet[WC].print()
   }
+
+  // *************************************************************************
+  //     USER DATA TYPES
+  // *************************************************************************
+
+  case class WC(word: String, frequency: Long)
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
index 3c53fc4..587a716 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
@@ -23,11 +23,18 @@ import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.TableEnvironment
 
 /**
- * Simple example for demonstrating the use of the Table API for a Word Count.
- */
+  * Simple example for demonstrating the use of the Table API for a Word Count in Scala.
+  *
+  * This example shows how to:
+  *  - Convert DataSets to Tables
+  *  - Apply group, aggregate, select, and filter operations
+  *
+  */
 object WordCountTable {
 
-  case class WC(word: String, count: Int)
+  // *************************************************************************
+  //     PROGRAM
+  // *************************************************************************
 
   def main(args: Array[String]): Unit = {
 
@@ -39,9 +46,17 @@ object WordCountTable {
     val expr = input.toTable(tEnv)
     val result = expr
       .groupBy('word)
-      .select('word, 'count.sum as 'count)
+      .select('word, 'frequency.sum as 'frequency)
+      .filter('frequency === 2)
       .toDataSet[WC]
 
     result.print()
   }
+
+  // *************************************************************************
+  //     USER DATA TYPES
+  // *************************************************************************
+
+  case class WC(word: String, frequency: Long)
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
index ddf5884..6df3f65 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
@@ -33,7 +33,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.apache.flink.examples.java.JavaTableExample.WC;
+import org.apache.flink.examples.java.WordCountTable.WC;
 
 import java.util.List;
 
@@ -196,8 +196,8 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 
 		Table filtered = table
 				.groupBy("word")
-				.select("word.count as count, word")
-				.filter("count = 2");
+				.select("word.frequency as frequency, word")
+				.filter("frequency = 2");
 
 		List<String> result = tableEnv.toDataSet(filtered, WC.class)
 				.map(new MapFunction<WC, String>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
index 6c413e5..7c0cdff 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
@@ -195,11 +195,11 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
     val expr = input.toTable(tEnv)
     val result = expr
       .groupBy('word)
-      .select('word, 'count.sum as 'count)
-      .filter('count === 2)
+      .select('word, 'frequency.sum as 'frequency)
+      .filter('frequency === 2)
       .toDataSet[MyWC]
 
-    val mappedResult = result.map(w => (w.word, w.count * 10)).collect()
+    val mappedResult = result.map(w => (w.word, w.frequency * 10)).collect()
     val expected = "(hello,20)\n" + "(hola,20)"
     TestBaseUtils.compareResultAsText(mappedResult.asJava, expected)
   }


[2/2] flink git commit: [FLINK-4180] [FLINK-4181] [table] add Batch SQL and Stream SQL and Stream Table API examples

Posted by tw...@apache.org.
[FLINK-4180] [FLINK-4181] [table] add Batch SQL and Stream SQL and Stream Table API examples

This closes #2274.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/123c637e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/123c637e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/123c637e

Branch: refs/heads/master
Commit: 123c637e804bfdd6569051cf705ec73b5cb95352
Parents: 45df1b2
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Thu Jul 21 00:31:01 2016 +0800
Committer: twalthr <tw...@apache.org>
Committed: Tue Aug 2 16:24:59 2016 +0200

----------------------------------------------------------------------
 .../flink/examples/java/JavaSQLExample.java     | 72 ++++++++++++++++++++
 .../flink/examples/scala/StreamSQLExample.scala | 62 +++++++++++++++++
 .../examples/scala/StreamTableExample.scala     | 58 ++++++++++++++++
 .../flink/examples/scala/WordCountSQL.scala     | 43 ++++++++++++
 4 files changed, 235 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/123c637e/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java
new file mode 100644
index 0000000..bbac94a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.java;
+
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.table.TableEnvironment;
+
+/**
+ * Simple example that shows how the Batch SQL used in Java.
+ */
+public class JavaSQLExample {
+
+	public static class WC {
+		public String word;
+		public long frequency;
+
+		// Public constructor to make it a Flink POJO
+		public WC() {
+
+		}
+
+		public WC(String word, long frequency) {
+			this.word = word;
+			this.frequency = frequency;
+		}
+
+		@Override
+		public String toString() {
+			return "WC " + word + " " + frequency;
+		}
+	}
+
+	public static void main(String[] args) throws Exception {
+
+		// set up execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<WC> input = env.fromElements(
+			new WC("Hello", 1),
+			new WC("Ciao", 1),
+			new WC("Hello", 1));
+
+		// register the DataSet as table "WordCount"
+		tableEnv.registerDataSet("WordCount", input, "word, frequency");
+		// run a SQL query on the Table and retrieve the result as a new Table
+		Table table = tableEnv.sql(
+			"SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
+
+		DataSet<WC> result = tableEnv.toDataSet(table, WC.class);
+
+		result.print();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/123c637e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
new file mode 100644
index 0000000..8eed77d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+
+/**
+  * Simple example for demonstrating the use of SQL on Stream Table.
+  */
+object StreamSQLExample {
+
+  case class Order(user: Long, product: String, amount: Int)
+
+  def main(args: Array[String]): Unit = {
+
+    // set up execution environment
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val orderA: DataStream[Order] = env.fromCollection(Seq(
+      Order(1L, "beer", 3),
+      Order(1L, "diaper", 4),
+      Order(3L, "rubber", 2)))
+
+    val orderB: DataStream[Order] = env.fromCollection(Seq(
+      Order(2L, "pen", 3),
+      Order(2L, "rubber", 3),
+      Order(4L, "beer", 1)))
+
+    // register the DataStream under the name "OrderA" and "OrderB"
+    tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount)
+    tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
+
+    // Union two tables
+    val result = tEnv.sql(
+      "SELECT STREAM * FROM OrderA WHERE amount > 2 UNION ALL " +
+        "SELECT STREAM * FROM OrderB WHERE amount < 2")
+
+    result.toDataStream[Order].print()
+
+    env.execute()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/123c637e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
new file mode 100644
index 0000000..9081f50
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+
+/**
+  * Simple example for demonstrating the use of Table API on Stream Table.
+  */
+object StreamTableExample {
+
+  case class Order(user: Long, product: String, amount: Int)
+
+  def main(args: Array[String]): Unit = {
+
+    // set up execution environment
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val orderA = env.fromCollection(Seq(
+      Order(1L, "beer", 3),
+      Order(1L, "diaper", 4),
+      Order(3L, "rubber", 2))).toTable(tEnv)
+
+    val orderB = env.fromCollection(Seq(
+      Order(2L, "pen", 3),
+      Order(2L, "rubber", 3),
+      Order(4L, "beer", 1))).toTable(tEnv)
+
+    val result: DataStream[Order] = orderA.unionAll(orderB)
+      .select('user, 'product, 'amount)
+      .where('amount > 2)
+      .toDataStream[Order]
+
+    result.print()
+
+    env.execute()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/123c637e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
new file mode 100644
index 0000000..41efffc
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.TableEnvironment
+
+/**
+  * Simple example that shows how the Batch SQL used in Scala.
+  */
+object WordCountSQL {
+  case class WC(word: String, count: Int)
+
+  def main(args: Array[String]): Unit = {
+
+    // set up execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
+    tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
+
+    val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
+
+    table.toDataSet[WC].print()
+  }
+}