You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2016/04/11 18:02:40 UTC

[1/2] flink git commit: [FLINK-3640] [docs] extend the Table API docs and add a section about embedded SQL mode

Repository: flink
Updated Branches:
  refs/heads/master e16ca8460 -> ef7f9ac9a


[FLINK-3640] [docs] extend the Table API docs and add a section about embedded SQL mode

This closes #1867


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef7f9ac9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef7f9ac9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef7f9ac9

Branch: refs/heads/master
Commit: ef7f9ac9a2b73d0cde5d961ae13f5737473e27bf
Parents: 9e05439
Author: vasia <va...@apache.org>
Authored: Mon Apr 11 15:14:55 2016 +0200
Committer: vasia <va...@apache.org>
Committed: Mon Apr 11 18:01:40 2016 +0200

----------------------------------------------------------------------
 docs/apis/batch/libs/index.md |   2 +-
 docs/apis/batch/libs/table.md | 174 ++++++++++++++++++++++++++++++++++---
 2 files changed, 164 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef7f9ac9/docs/apis/batch/libs/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/index.md b/docs/apis/batch/libs/index.md
index b2df0c4..111eaa9 100644
--- a/docs/apis/batch/libs/index.md
+++ b/docs/apis/batch/libs/index.md
@@ -26,4 +26,4 @@ under the License.
 
 - Graph processing: [Gelly](gelly_guide.html)
 - Machine Learning: [FlinkML](ml/index.html)
-- Relational Queries: [Table](table.html)
+- Relational Queries: [Table and SQL](table.html)

http://git-wip-us.apache.org/repos/asf/flink/blob/ef7f9ac9/docs/apis/batch/libs/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/table.md b/docs/apis/batch/libs/table.md
index 316c433..527d10d 100644
--- a/docs/apis/batch/libs/table.md
+++ b/docs/apis/batch/libs/table.md
@@ -1,15 +1,15 @@
 ---
-title: "Table API - Relational Queries"
+title: "Table API and SQL"
 is_beta: true
 # Top navigation
 top-nav-group: libs
 top-nav-pos: 3
-top-nav-title: "Relational: Table"
+top-nav-title: "Table API and SQL"
 # Sub navigation
 sub-nav-group: batch
 sub-nav-parent: libs
 sub-nav-pos: 3
-sub-nav-title: Table
+sub-nav-title: Table API and SQL
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -30,13 +30,24 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+
 **The Table API: an experimental feature**
 
-Flink provides an API that allows specifying operations using SQL-like expressions. Instead of
-manipulating a `DataSet` you can work with a `Table` on which relational operations can
-be performed.
+Flink's Table API is a SQL-like expression language embedded in Java and Scala.
+Instead of manipulating a `DataSet` or `DataStream`, you can create and work with a relational `Table` abstraction.
+Tables have a schema and allow running relational operations on them, including selection, aggregation, and joins.
+A `Table` can be created from a `DataSet` or a `DataStream` and then queried either using the Table API operators or using SQL queries.
+Once a `Table` is converted back to a `DataSet` or `DataStream`, the defined relational plan is optimized using [Apache Calcite](https://calcite.apache.org/)
+and transformed into a `DataSet` or `DataStream` execution plan.
+
+* This will be replaced by the TOC
+{:toc}
+
+Using the Table API and SQL
+----------------------------
 
-The following dependency must be added to your project in order to use the Table API:
+The Table API and SQL are part of the *flink-libraries* Maven project.
+The following dependency must be added to your project in order to use the Table API and SQL:
 
 {% highlight xml %}
 <dependency>
@@ -48,7 +59,13 @@ The following dependency must be added to your project in order to use the Table
 
 Note that the Table API is currently not part of the binary distribution. See linking with it for cluster execution [here]({{ site.baseurl }}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
 
-## Scala Table API
+Table API
+----------
+The Table API provides methods for running relational operations on Tables, both in Scala and Java.
+In the following sections you can find examples that show how to create Tables, how to define and execute relational queries on them,
+and how to retrieve the result of a query as a `DataSet`.
+
+### Scala Table API
 
 The Table API can be enabled by importing `org.apache.flink.api.scala.table._`. This enables
 implicit conversions that allow
@@ -90,7 +107,9 @@ in this example we see that you can also use Strings to specify relational expre
 Please refer to the Scaladoc (and Javadoc) for a full list of supported operations and a
 description of the expression syntax.
 
-## Java Table API
+{% top %}
+
+### Java Table API
 
 When using Java, Tables can be converted to and from DataSet using `TableEnvironment`.
 This example is equivalent to the above Scala Example:
@@ -131,7 +150,9 @@ DataSet<WC> result = tableEnv.toDataSet(wordCounts, WC.class);
 When using Java, the embedded DSL for specifying expressions cannot be used. Only String expressions
 are supported. They support exactly the same feature set as the expression DSL.
 
-## Table API Operators
+{% top %}
+
+### Table API Operators
 The Table API provides a domain-spcific language to execute language-integrated queries on structured data in Scala and Java.
 This section gives a brief overview of all available operators. You can find more details of operators in the [Javadoc]({{site.baseurl}}/api/java/org/apache/flink/api/table/Table.html).
 
@@ -367,7 +388,9 @@ val result = in.distinct();
 </div>
 </div>
 
-## Expression Syntax
+{% top %}
+
+### Expression Syntax
 Some of operators in previous section expect an expression. These can either be specified using an embedded Scala DSL or
 a String expression. Please refer to the examples above to learn how expressions can be
 formulated.
@@ -408,3 +431,132 @@ Here, `literal` is a valid Java literal and `field reference` specifies a column
 column names follow Java identifier syntax.
 
 Only the types `LONG` and `STRING` can be casted to `DATE` and vice versa. A `LONG` casted to `DATE` must be a milliseconds timestamp. A `STRING` casted to `DATE` must have the format "`yyyy-MM-dd HH:mm:ss.SSS`", "`yyyy-MM-dd`", "`HH:mm:ss`", or a milliseconds timestamp. By default, all timestamps refer to the UTC timezone beginning from January 1, 1970, 00:00:00 in milliseconds.
+
+{% top %}
+
+SQL
+----
+The Table API also supports embedded SQL queries.
+In order to use a `Table` or `DataSet` in a SQL query, it has to be registered in the `TableEnvironment`, using a unique name.
+A registered `Table` can be retrieved back from the `TableEnvironment` using the `scan` method:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+// create a Table environment
+TableEnvironment tableEnv = new TableEnvironment();
+// reset the translation context: this will erase existing registered Tables
+TranslationContext.reset();
+// read a DataSet from an external source
+DataSet<Tuple2<Integer, Long>> ds = env.readCsvFile(...);
+// register the DataSet under the name "MyTable"
+tableEnv.registerDataSet("MyTable", ds);
+// retrieve "MyTable" into a new Table
+Table t = tableEnv.scan("MyTable");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+// create a Table environment
+val tableEnv = new TableEnvironment
+// reset the translation context: this will erase existing registered Tables
+TranslationContext.reset()
+// read a DataSet from an external source
+val ds = env.readCsvFile(...)
+// register the DataSet under the name "MyTable"
+tableEnv.registerDataSet("MyTable", ds)
+// retrieve "MyTable" into a new Table
+val t = tableEnv.scan("MyTable")
+{% endhighlight %}
+</div>
+</div>
+
+*Note: Table names are not allowed to follow the `^_DataSetTable_[0-9]+` pattern, as this is reserved for internal use only.*
+
+When registering a `DataSet`, one can also give names to the `Table` columns. For example, if "MyTable" has three columns, `user`, `product`, and `order`, we can give them names upon registering the `DataSet` as shown below:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// register the DataSet under the name "MyTable" with columns user, product, and order
+tableEnv.registerDataSet("MyTable", ds, "user, product, order");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// register the DataSet under the name "MyTable" with columns user, product, and order
+tableEnv.registerDataSet("MyTable", ds, 'user, 'product, 'order)
+{% endhighlight %}
+</div>
+</div>
+
+A `Table` can be registered in a similar way:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// read a DataSet from an external source
+DataSet<Tuple2<Integer, Long>> ds = env.readCsvFile(...);
+// create a Table from the DataSet with columns user, product, and order
+Table t = tableEnv.fromDataSet(ds).as("user, product, order");
+// register the Table under the name "MyTable"
+tableEnv.registerTable("MyTable", t);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// read a DataSet from an external source and
+// create a Table from the DataSet with columns user, product, and order
+val t = env.readCsvFile(...).as('user, 'product, 'order)
+// register the Table under the name "MyTable"
+tableEnv.registerTable("MyTable", t)
+{% endhighlight %}
+</div>
+</div>
+
+After registering a `Table` or `DataSet`, one can use them in SQL queries. A SQL query is defined using the `sql` method of the `TableEnvironment`.
+The result of the method is a new `Table` which can either be converted back to a `DataSet` or used in subsequent Table API queries.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+// create a Table environment
+TableEnvironment tableEnv = new TableEnvironment();
+// reset the translation context: this will erase existing registered Tables
+TranslationContext.reset();
+// read a DataSet from an external source
+DataSet<Tuple2<Integer, Long>> ds = env.readCsvFile(...);
+// create a Table from the DataSet
+Table t = tableEnv.fromDataSet(ds);
+// register the Table under the name "MyTable"
+tableEnv.registerTable("MyTable", t);
+// run a sql query and retrieve the result in a new Table
+Table result = tableEnv.sql("SELECT * FROM MyTable");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+// create a Table environment
+val tableEnv = new TableEnvironment
+// reset the translation context: this will erase existing registered Tables
+TranslationContext.reset()
+// create a Table
+val t = env.readCsvFile(...).as('a, 'b, 'c)
+// register the Table under the name "MyTable"
+tableEnv.registerTable("MyTable", t)
+// run a sql query and retrieve the result in a new Table
+val result = tableEnv.sql("SELECT * FROM MyTable")
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+


[2/2] flink git commit: [FLINK-3579] Improve String concatenation in the Table API

Posted by va...@apache.org.
[FLINK-3579] Improve String concatenation in the Table API

This closes #1821


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e054393
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e054393
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e054393

Branch: refs/heads/master
Commit: 9e054393bbe7190bd239bd2bb926d588aa5d1c6f
Parents: e16ca84
Author: ramkrishna <ra...@gmail.com>
Authored: Mon Mar 21 12:52:31 2016 +0530
Committer: vasia <va...@apache.org>
Committed: Mon Apr 11 18:01:40 2016 +0200

----------------------------------------------------------------------
 .../api/table/plan/RexNodeTranslator.scala      | 14 ++++++-
 .../table/test/StringExpressionsITCase.java     | 43 +++++++++++++++++++-
 2 files changed, 54 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e054393/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
index 1668efb..b50b74b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
@@ -20,9 +20,11 @@ package org.apache.flink.api.table.plan
 
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 import org.apache.calcite.tools.RelBuilder.AggCall
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.table.expressions._
 import org.apache.flink.api.table.typeutils.TypeConverter
 
@@ -134,7 +136,17 @@ object RexNodeTranslator {
       case Plus(left, right) =>
         val l = toRexNode(left, relBuilder)
         val r = toRexNode(right, relBuilder)
-        relBuilder.call(SqlStdOperatorTable.PLUS, l, r)
+        if(SqlTypeName.STRING_TYPES.contains(l.getType.getSqlTypeName)) {
+          val cast: RexNode = relBuilder.cast(r,
+            TypeConverter.typeInfoToSqlType(BasicTypeInfo.STRING_TYPE_INFO))
+          relBuilder.call(SqlStdOperatorTable.PLUS, l, cast)
+        } else if(SqlTypeName.STRING_TYPES.contains(r.getType.getSqlTypeName)) {
+          val cast: RexNode = relBuilder.cast(l,
+            TypeConverter.typeInfoToSqlType(BasicTypeInfo.STRING_TYPE_INFO))
+          relBuilder.call(SqlStdOperatorTable.PLUS, cast, r)
+        } else {
+          relBuilder.call(SqlStdOperatorTable.PLUS, l, r)
+        }
       case Minus(left, right) =>
         val l = toRexNode(left, relBuilder)
         val r = toRexNode(right, relBuilder)

http://git-wip-us.apache.org/repos/asf/flink/blob/9e054393/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
index 65f0470..86a3bfd 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -150,7 +149,47 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> tupleDataSet = CollectionDataSets.get3TupleDataSet(env);
 		Table in = tableEnv.fromDataSet(tupleDataSet, "a, b, c");
 		// Must fail because the comparison here is between String(column 'c') and (Integer 10)
-		Table res = in.filter("c > 10" );
+		Table res = in.filter("c > 10");
 		DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
 	}
+
+	@Test
+	public void testStringConcat() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+			new Tuple2<>("ABCD", 3),
+			new Tuple2<>("ABCD", 2));
+
+		Table in = tableEnv.fromDataSet(ds, "a, b");
+
+		Table result = in
+			.select("a + b + 42");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "ABCD342\nABCD242";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testStringConcat1() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+			new Tuple2<>("ABCD", 3),
+			new Tuple2<>("ABCD", 2));
+
+		Table in = tableEnv.fromDataSet(ds, "a, b");
+
+		Table result = in
+			.select("42 + b + a");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "44ABCD\n45ABCD";
+		compareResultAsText(results, expected);
+	}
 }