You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/08/02 14:25:45 UTC
[1/2] flink git commit: [FLINK-4180] [FLINK-4181] [table] Ensure
examples consistency
Repository: flink
Updated Branches:
refs/heads/master 45df1b25b -> a1fef27b9
[FLINK-4180] [FLINK-4181] [table] Ensure examples consistency
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1fef27b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1fef27b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1fef27b
Branch: refs/heads/master
Commit: a1fef27b9c9ddd03b3de9bfe35f1d9e7d973c756
Parents: 123c637e
Author: twalthr <tw...@apache.org>
Authored: Tue Aug 2 16:21:45 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Aug 2 16:24:59 2016 +0200
----------------------------------------------------------------------
.../flink/examples/java/JavaSQLExample.java | 72 ----------------
.../flink/examples/java/JavaTableExample.java | 71 ----------------
.../flink/examples/java/WordCountSQL.java | 87 ++++++++++++++++++++
.../flink/examples/java/WordCountTable.java | 85 +++++++++++++++++++
.../flink/examples/scala/StreamSQLExample.scala | 22 ++++-
.../examples/scala/StreamTableExample.scala | 18 +++-
.../flink/examples/scala/TPCHQuery3Table.scala | 7 +-
.../flink/examples/scala/WordCountSQL.scala | 23 +++++-
.../flink/examples/scala/WordCountTable.scala | 23 +++++-
.../java/batch/table/AggregationsITCase.java | 6 +-
.../scala/batch/table/AggregationsITCase.scala | 6 +-
11 files changed, 256 insertions(+), 164 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java
deleted file mode 100644
index bbac94a..0000000
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.java;
-
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.table.TableEnvironment;
-
-/**
- * Simple example that shows how the Batch SQL used in Java.
- */
-public class JavaSQLExample {
-
- public static class WC {
- public String word;
- public long frequency;
-
- // Public constructor to make it a Flink POJO
- public WC() {
-
- }
-
- public WC(String word, long frequency) {
- this.word = word;
- this.frequency = frequency;
- }
-
- @Override
- public String toString() {
- return "WC " + word + " " + frequency;
- }
- }
-
- public static void main(String[] args) throws Exception {
-
- // set up execution environment
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- DataSet<WC> input = env.fromElements(
- new WC("Hello", 1),
- new WC("Ciao", 1),
- new WC("Hello", 1));
-
- // register the DataSet as table "WordCount"
- tableEnv.registerDataSet("WordCount", input, "word, frequency");
- // run a SQL query on the Table and retrieve the result as a new Table
- Table table = tableEnv.sql(
- "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
-
- DataSet<WC> result = tableEnv.toDataSet(table, WC.class);
-
- result.print();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
deleted file mode 100644
index 0a776a4..0000000
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.java;
-
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.table.TableEnvironment;
-
-/**
- * Very simple example that shows how the Java Table API can be used.
- */
-public class JavaTableExample {
-
- public static class WC {
- public String word;
- public long count;
-
- // Public constructor to make it a Flink POJO
- public WC() {
-
- }
-
- public WC(String word, long count) {
- this.word = word;
- this.count = count;
- }
-
- @Override
- public String toString() {
- return "WC " + word + " " + count;
- }
- }
-
- public static void main(String[] args) throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
- DataSet<WC> input = env.fromElements(
- new WC("Hello", 1),
- new WC("Ciao", 1),
- new WC("Hello", 1));
-
- Table table = tableEnv.fromDataSet(input);
-
- Table filtered = table
- .groupBy("word")
- .select("word.count as count, word")
- .filter("count = 2");
-
- DataSet<WC> result = tableEnv.toDataSet(filtered, WC.class);
-
- result.print();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/WordCountSQL.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/WordCountSQL.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/WordCountSQL.java
new file mode 100644
index 0000000..c9043d1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/WordCountSQL.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.java;
+
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.table.TableEnvironment;
+
+/**
+ * Simple example that shows how the Batch SQL API is used in Java.
+ *
+ * This example shows how to:
+ * - Convert DataSets to Tables
+ * - Register a Table under a name
+ * - Run a SQL query on the registered Table
+ *
+ */
+public class WordCountSQL {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ // set up execution environment
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<WC> input = env.fromElements(
+ new WC("Hello", 1),
+ new WC("Ciao", 1),
+ new WC("Hello", 1));
+
+ // register the DataSet as table "WordCount"
+ tEnv.registerDataSet("WordCount", input, "word, frequency");
+
+ // run a SQL query on the Table and retrieve the result as a new Table
+ Table table = tEnv.sql(
+ "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
+
+ DataSet<WC> result = tEnv.toDataSet(table, WC.class);
+
+ result.print();
+ }
+
+ // *************************************************************************
+ // USER DATA TYPES
+ // *************************************************************************
+
+ public static class WC {
+ public String word;
+ public long frequency;
+
+ // public constructor to make it a Flink POJO
+ public WC() {
+ }
+
+ public WC(String word, long frequency) {
+ this.word = word;
+ this.frequency = frequency;
+ }
+
+ @Override
+ public String toString() {
+ return "WC " + word + " " + frequency;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/WordCountTable.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/WordCountTable.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/WordCountTable.java
new file mode 100644
index 0000000..c339cf1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/WordCountTable.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.java;
+
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.table.TableEnvironment;
+
+/**
+ * Simple example for demonstrating the use of the Table API for a Word Count in Java.
+ *
+ * This example shows how to:
+ * - Convert DataSets to Tables
+ * - Apply group, aggregate, select, and filter operations
+ *
+ */
+public class WordCountTable {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
+ BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<WC> input = env.fromElements(
+ new WC("Hello", 1),
+ new WC("Ciao", 1),
+ new WC("Hello", 1));
+
+ Table table = tEnv.fromDataSet(input);
+
+ Table filtered = table
+ .groupBy("word")
+ .select("word, frequency.sum as frequency")
+ .filter("frequency = 2");
+
+ DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);
+
+ result.print();
+ }
+
+ // *************************************************************************
+ // USER DATA TYPES
+ // *************************************************************************
+
+ public static class WC {
+ public String word;
+ public long frequency;
+
+ // public constructor to make it a Flink POJO
+ public WC() {
+
+ }
+
+ public WC(String word, long frequency) {
+ this.word = word;
+ this.frequency = frequency;
+ }
+
+ @Override
+ public String toString() {
+ return "WC " + word + " " + frequency;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
index 8eed77d..5e0c773 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
@@ -23,11 +23,19 @@ import org.apache.flink.api.table.TableEnvironment
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
/**
- * Simple example for demonstrating the use of SQL on Stream Table.
+ * Simple example for demonstrating the use of SQL on a Stream Table.
+ *
+ * This example shows how to:
+ * - Convert DataStreams to Tables
+ * - Register a Table under a name
+ * - Run a StreamSQL query on the registered Table
+ *
*/
object StreamSQLExample {
- case class Order(user: Long, product: String, amount: Int)
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
def main(args: Array[String]): Unit = {
@@ -45,11 +53,11 @@ object StreamSQLExample {
Order(2L, "rubber", 3),
Order(4L, "beer", 1)))
- // register the DataStream under the name "OrderA" and "OrderB"
+ // register the DataStreams under the name "OrderA" and "OrderB"
tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount)
tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
- // Union two tables
+ // union the two tables
val result = tEnv.sql(
"SELECT STREAM * FROM OrderA WHERE amount > 2 UNION ALL " +
"SELECT STREAM * FROM OrderB WHERE amount < 2")
@@ -59,4 +67,10 @@ object StreamSQLExample {
env.execute()
}
+ // *************************************************************************
+ // USER DATA TYPES
+ // *************************************************************************
+
+ case class Order(user: Long, product: String, amount: Int)
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
index 9081f50..2ce2684 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
@@ -23,11 +23,18 @@ import org.apache.flink.api.table.TableEnvironment
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
/**
- * Simple example for demonstrating the use of Table API on Stream Table.
+ * Simple example for demonstrating the use of Table API on a Stream Table.
+ *
+ * This example shows how to:
+ * - Convert DataStreams to Tables
+ * - Apply union, select, and filter operations
+ *
*/
object StreamTableExample {
- case class Order(user: Long, product: String, amount: Int)
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
def main(args: Array[String]): Unit = {
@@ -45,6 +52,7 @@ object StreamTableExample {
Order(2L, "rubber", 3),
Order(4L, "beer", 1))).toTable(tEnv)
+ // union the two tables
val result: DataStream[Order] = orderA.unionAll(orderB)
.select('user, 'product, 'amount)
.where('amount > 2)
@@ -55,4 +63,10 @@ object StreamTableExample {
env.execute()
}
+ // *************************************************************************
+ // USER DATA TYPES
+ // *************************************************************************
+
+ case class Order(user: Long, product: String, amount: Int)
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
index a761f4f..a950988 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
@@ -54,9 +54,6 @@ import org.apache.flink.api.table.TableEnvironment
* o_orderdate;
* }}}
*
- * Compared to the original TPC-H query this version does not sort the result by revenue
- * and orderdate.
- *
* Input files are plain text CSV files using the pipe character ('|') as field separator
* as generated by the TPC-H data generator which is available at
* [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
@@ -73,6 +70,10 @@ import org.apache.flink.api.table.TableEnvironment
*/
object TPCHQuery3Table {
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
def main(args: Array[String]) {
if (!parseParameters(args)) {
return
http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
index 41efffc..96a603e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
@@ -22,10 +22,19 @@ import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.TableEnvironment
/**
- * Simple example that shows how the Batch SQL used in Scala.
+ * Simple example that shows how the Batch SQL API is used in Scala.
+ *
+ * This example shows how to:
+ * - Convert DataSets to Tables
+ * - Register a Table under a name
+ * - Run a SQL query on the registered Table
+ *
*/
object WordCountSQL {
- case class WC(word: String, count: Int)
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
def main(args: Array[String]): Unit = {
@@ -34,10 +43,20 @@ object WordCountSQL {
val tEnv = TableEnvironment.getTableEnvironment(env)
val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
+
+ // register the DataSet as table "WordCount"
tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
+ // run a SQL query on the Table and retrieve the result as a new Table
val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
table.toDataSet[WC].print()
}
+
+ // *************************************************************************
+ // USER DATA TYPES
+ // *************************************************************************
+
+ case class WC(word: String, frequency: Long)
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
index 3c53fc4..587a716 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
@@ -23,11 +23,18 @@ import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.TableEnvironment
/**
- * Simple example for demonstrating the use of the Table API for a Word Count.
- */
+ * Simple example for demonstrating the use of the Table API for a Word Count in Scala.
+ *
+ * This example shows how to:
+ * - Convert DataSets to Tables
+ * - Apply group, aggregate, select, and filter operations
+ *
+ */
object WordCountTable {
- case class WC(word: String, count: Int)
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
def main(args: Array[String]): Unit = {
@@ -39,9 +46,17 @@ object WordCountTable {
val expr = input.toTable(tEnv)
val result = expr
.groupBy('word)
- .select('word, 'count.sum as 'count)
+ .select('word, 'frequency.sum as 'frequency)
+ .filter('frequency === 2)
.toDataSet[WC]
result.print()
}
+
+ // *************************************************************************
+ // USER DATA TYPES
+ // *************************************************************************
+
+ case class WC(word: String, frequency: Long)
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
index ddf5884..6df3f65 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
@@ -33,7 +33,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.apache.flink.examples.java.JavaTableExample.WC;
+import org.apache.flink.examples.java.WordCountTable.WC;
import java.util.List;
@@ -196,8 +196,8 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
Table filtered = table
.groupBy("word")
- .select("word.count as count, word")
- .filter("count = 2");
+ .select("word.frequency as frequency, word")
+ .filter("frequency = 2");
List<String> result = tableEnv.toDataSet(filtered, WC.class)
.map(new MapFunction<WC, String>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/a1fef27b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
index 6c413e5..7c0cdff 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
@@ -195,11 +195,11 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
val expr = input.toTable(tEnv)
val result = expr
.groupBy('word)
- .select('word, 'count.sum as 'count)
- .filter('count === 2)
+ .select('word, 'frequency.sum as 'frequency)
+ .filter('frequency === 2)
.toDataSet[MyWC]
- val mappedResult = result.map(w => (w.word, w.count * 10)).collect()
+ val mappedResult = result.map(w => (w.word, w.frequency * 10)).collect()
val expected = "(hello,20)\n" + "(hola,20)"
TestBaseUtils.compareResultAsText(mappedResult.asJava, expected)
}
[2/2] flink git commit: [FLINK-4180] [FLINK-4181] [table] add Batch
SQL and Stream SQL and Stream Table API examples
Posted by tw...@apache.org.
[FLINK-4180] [FLINK-4181] [table] add Batch SQL and Stream SQL and Stream Table API examples
This closes #2274.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/123c637e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/123c637e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/123c637e
Branch: refs/heads/master
Commit: 123c637e804bfdd6569051cf705ec73b5cb95352
Parents: 45df1b2
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Thu Jul 21 00:31:01 2016 +0800
Committer: twalthr <tw...@apache.org>
Committed: Tue Aug 2 16:24:59 2016 +0200
----------------------------------------------------------------------
.../flink/examples/java/JavaSQLExample.java | 72 ++++++++++++++++++++
.../flink/examples/scala/StreamSQLExample.scala | 62 +++++++++++++++++
.../examples/scala/StreamTableExample.scala | 58 ++++++++++++++++
.../flink/examples/scala/WordCountSQL.scala | 43 ++++++++++++
4 files changed, 235 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/123c637e/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java
new file mode 100644
index 0000000..bbac94a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.java;
+
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.table.TableEnvironment;
+
+/**
+ * Simple example that shows how the Batch SQL used in Java.
+ */
+public class JavaSQLExample {
+
+ public static class WC {
+ public String word;
+ public long frequency;
+
+ // Public constructor to make it a Flink POJO
+ public WC() {
+
+ }
+
+ public WC(String word, long frequency) {
+ this.word = word;
+ this.frequency = frequency;
+ }
+
+ @Override
+ public String toString() {
+ return "WC " + word + " " + frequency;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ // set up execution environment
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ DataSet<WC> input = env.fromElements(
+ new WC("Hello", 1),
+ new WC("Ciao", 1),
+ new WC("Hello", 1));
+
+ // register the DataSet as table "WordCount"
+ tableEnv.registerDataSet("WordCount", input, "word, frequency");
+ // run a SQL query on the Table and retrieve the result as a new Table
+ Table table = tableEnv.sql(
+ "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
+
+ DataSet<WC> result = tableEnv.toDataSet(table, WC.class);
+
+ result.print();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/123c637e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
new file mode 100644
index 0000000..8eed77d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+
+/**
+ * Simple example for demonstrating the use of SQL on Stream Table.
+ */
+object StreamSQLExample {
+
+ case class Order(user: Long, product: String, amount: Int)
+
+ def main(args: Array[String]): Unit = {
+
+ // set up execution environment
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val orderA: DataStream[Order] = env.fromCollection(Seq(
+ Order(1L, "beer", 3),
+ Order(1L, "diaper", 4),
+ Order(3L, "rubber", 2)))
+
+ val orderB: DataStream[Order] = env.fromCollection(Seq(
+ Order(2L, "pen", 3),
+ Order(2L, "rubber", 3),
+ Order(4L, "beer", 1)))
+
+ // register the DataStream under the name "OrderA" and "OrderB"
+ tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount)
+ tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
+
+ // Union two tables
+ val result = tEnv.sql(
+ "SELECT STREAM * FROM OrderA WHERE amount > 2 UNION ALL " +
+ "SELECT STREAM * FROM OrderB WHERE amount < 2")
+
+ result.toDataStream[Order].print()
+
+ env.execute()
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/123c637e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
new file mode 100644
index 0000000..9081f50
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+
+/**
+ * Simple example for demonstrating the use of Table API on Stream Table.
+ */
+object StreamTableExample {
+
+ case class Order(user: Long, product: String, amount: Int)
+
+ def main(args: Array[String]): Unit = {
+
+ // set up execution environment
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val orderA = env.fromCollection(Seq(
+ Order(1L, "beer", 3),
+ Order(1L, "diaper", 4),
+ Order(3L, "rubber", 2))).toTable(tEnv)
+
+ val orderB = env.fromCollection(Seq(
+ Order(2L, "pen", 3),
+ Order(2L, "rubber", 3),
+ Order(4L, "beer", 1))).toTable(tEnv)
+
+ val result: DataStream[Order] = orderA.unionAll(orderB)
+ .select('user, 'product, 'amount)
+ .where('amount > 2)
+ .toDataStream[Order]
+
+ result.print()
+
+ env.execute()
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/123c637e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
new file mode 100644
index 0000000..41efffc
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.TableEnvironment
+
+/**
+ * Simple example that shows how the Batch SQL used in Scala.
+ */
+object WordCountSQL {
+ case class WC(word: String, count: Int)
+
+ def main(args: Array[String]): Unit = {
+
+ // set up execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
+ tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
+
+ val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
+
+ table.toDataSet[WC].print()
+ }
+}