You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/07/13 21:43:15 UTC

[1/4] flink git commit: [FLINK-6693] [table] Support DATE_FORMAT function in the Table / SQL API.

Repository: flink
Updated Branches:
  refs/heads/master cf791bd50 -> 94d3166b4


[FLINK-6693] [table] Support DATE_FORMAT function in the Table / SQL API.

This closes #4078.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ac8476c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ac8476c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ac8476c

Branch: refs/heads/master
Commit: 2ac8476c2d7008445302a966b5adafac6f3e4a09
Parents: cf791bd
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Jun 5 23:23:31 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jul 13 18:54:36 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/sql.md                           | 180 ++++++++++++++++---
 flink-libraries/flink-table/pom.xml             |   5 +
 .../flink/table/codegen/CodeGenerator.scala     |  25 +++
 .../table/codegen/calls/DateFormatCallGen.scala |  44 +++++
 .../table/codegen/calls/FunctionGenerator.scala |   8 +
 .../apache/flink/table/expressions/time.scala   |  14 +-
 .../functions/sql/DateTimeSqlFunction.scala     |  34 ++++
 .../runtime/functions/DateTimeFunctions.scala   | 118 ++++++++++++
 .../runtime/functions/ThreadLocalCache.scala    |  49 +++++
 .../flink/table/validate/FunctionCatalog.scala  |   4 +-
 .../expressions/DateTimeFunctionTest.scala      |  66 +++++++
 11 files changed, 517 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2ac8476c/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 3b175a9..467732a 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -245,8 +245,8 @@ SELECT PRETTY_PRINT(user) FROM Orders
         <p><b>Note:</b> GroupBy on a streaming table produces an updating result. See the <a href="streaming.html">Streaming Concepts</a> page for details.
         </p>
 {% highlight sql %}
-SELECT a, SUM(b) as d 
-FROM Orders 
+SELECT a, SUM(b) as d
+FROM Orders
 GROUP BY a
 {% endhighlight %}
       </td>
@@ -259,8 +259,8 @@ GROUP BY a
     	<td>
         <p>Use a group window to compute a single result row per group. See <a href="#group-windows">Group Windows</a> section for more details.</p>
 {% highlight sql %}
-SELECT user, SUM(amount) 
-FROM Orders 
+SELECT user, SUM(amount)
+FROM Orders
 GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
 {% endhighlight %}
       </td>
@@ -274,9 +274,9 @@ GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
         <p><b>Note:</b> All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single <a href="streaming.html#time-attributes">time attribute</a></p>
 {% highlight sql %}
 SELECT COUNT(amount) OVER (
-  PARTITION BY user 
-  ORDER BY proctime 
-  ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) 
+  PARTITION BY user
+  ORDER BY proctime
+  ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
 FROM Orders
 {% endhighlight %}
       </td>
@@ -301,8 +301,8 @@ SELECT DISTINCT users FROM Orders
       </td>
       <td>
 {% highlight sql %}
-SELECT SUM(amount) 
-FROM Orders 
+SELECT SUM(amount)
+FROM Orders
 GROUP BY GROUPING SETS ((user), (product))
 {% endhighlight %}
       </td>
@@ -314,9 +314,9 @@ GROUP BY GROUPING SETS ((user), (product))
       </td>
       <td>
 {% highlight sql %}
-SELECT SUM(amount) 
-FROM Orders 
-GROUP BY users 
+SELECT SUM(amount)
+FROM Orders
+GROUP BY users
 HAVING SUM(amount) > 50
 {% endhighlight %}
       </td>
@@ -329,8 +329,8 @@ HAVING SUM(amount) > 50
       <td>
         <p>UDAGGs must be registered in the TableEnvironment. See the <a href="udfs.html">UDF documentation</a> for details on how to specify and register UDAGGs.</p>
 {% highlight sql %}
-SELECT MyAggregate(amount) 
-FROM Orders 
+SELECT MyAggregate(amount)
+FROM Orders
 GROUP BY users
 {% endhighlight %}
       </td>
@@ -360,10 +360,10 @@ GROUP BY users
         <p>Currently, only equi-joins are supported, i.e., joins that have at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported.</p>
         <p><b>Note:</b> The order of joins is not optimized. Tables are joined in the order in which they are specified in the FROM clause. Make sure to specify tables in an order that does not yield a cross join (Cartesian product) which are not supported and would cause a query to fail.</p>
 {% highlight sql %}
-SELECT * 
+SELECT *
 FROM Orders INNER JOIN Product ON Orders.productId = Product.id
 
-SELECT * 
+SELECT *
 FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
 {% endhighlight %}
       </td>
@@ -376,7 +376,7 @@ FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
     	<td>
         <p>Unnesting WITH ORDINALITY is not supported yet.</p>
 {% highlight sql %}
-SELECT users, tag 
+SELECT users, tag
 FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
 {% endhighlight %}
       </td>
@@ -389,7 +389,7 @@ FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
     	<td>
       <p>UDTFs must be registered in the TableEnvironment. See the <a href="udfs.html">UDF documentation</a> for details on how to specify and register UDTFs. </p>
 {% highlight sql %}
-SELECT users, tag 
+SELECT users, tag
 FROM Orders LATERAL VIEW UNNEST_UDTF(tags) t AS tag
 {% endhighlight %}
       </td>
@@ -418,7 +418,7 @@ FROM Orders LATERAL VIEW UNNEST_UDTF(tags) t AS tag
       </td>
       <td>
 {% highlight sql %}
-SELECT * 
+SELECT *
 FROM (
     (SELECT user FROM Orders WHERE a % 2 = 0)
   UNION
@@ -434,7 +434,7 @@ FROM (
       </td>
       <td>
 {% highlight sql %}
-SELECT * 
+SELECT *
 FROM (
     (SELECT user FROM Orders WHERE a % 2 = 0)
   UNION ALL
@@ -451,7 +451,7 @@ FROM (
       </td>
       <td>
 {% highlight sql %}
-SELECT * 
+SELECT *
 FROM (
     (SELECT user FROM Orders WHERE a % 2 = 0)
   INTERSECT
@@ -459,7 +459,7 @@ FROM (
 )
 {% endhighlight %}
 {% highlight sql %}
-SELECT * 
+SELECT *
 FROM (
     (SELECT user FROM Orders WHERE a % 2 = 0)
   EXCEPT
@@ -491,11 +491,11 @@ FROM (
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
-<b>Note:</b> The result of streaming queries must be primarily sorted on an ascending <a href="streaming.html#time-attributes">time attribute</a>. Additional sorting attributes are supported. 
+<b>Note:</b> The result of streaming queries must be primarily sorted on an ascending <a href="streaming.html#time-attributes">time attribute</a>. Additional sorting attributes are supported.
 
 {% highlight sql %}
-SELECT * 
-FROM Orders 
+SELECT *
+FROM Orders
 ORDER BY orderTime
 {% endhighlight %}
       </td>
@@ -507,8 +507,8 @@ ORDER BY orderTime
       </td>
       <td>
 {% highlight sql %}
-SELECT * 
-FROM Orders 
+SELECT *
+FROM Orders
 LIMIT 3
 {% endhighlight %}
       </td>
@@ -551,7 +551,7 @@ Group windows are defined in the `GROUP BY` clause of a SQL query. Just like que
 
 #### Time Attributes
 
-For SQL queries on streaming tables, the `time_attr` argument of the group window function must refer to a valid time attribute that specifies the processing time or event time of rows. See the [documentation of time attributes](streaming.html#time-attributes) to learn how to define time attributes. 
+For SQL queries on streaming tables, the `time_attr` argument of the group window function must refer to a valid time attribute that specifies the processing time or event time of rows. See the [documentation of time attributes](streaming.html#time-attributes) to learn how to define time attributes.
 
 For SQL on batch tables, the `time_attr` argument of the group window function must be an attribute of type `TIMESTAMP`.
 
@@ -1908,6 +1908,130 @@ QUARTER(date)
         <p>Determines whether two anchored time intervals overlap. Time point and temporal are transformed into a range defined by two time points (start, end). The function evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>. E.g. <code>(TIME '2:55:00', INTERVAL '1' HOUR) OVERLAPS (TIME '3:30:00', INTERVAL '2' HOUR)</code> leads to true; <code>(TIME '9:00:00', TIME '10:00:00') OVERLAPS (TIME '10:15:00', INTERVAL '3' HOUR)</code> leads to false.</p>
       </td>
     </tr>
+    <tr>
+      <td>
+        {% highlight text %}
+DATE_FORMAT(timestamp, format)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Formats timestamp as a string using format. The format is compatible with the one used by <code>date_parse</code> and <code>str_to_date</code> in MySQL. The following table describes the specified. </p>  
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+#### MySQL date format specifier
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 40%">Specifier</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+  <tr><td>{% highlight text %}%a{% endhighlight %}</td>
+  <td>Abbreviated weekday name (<code>Sun</code> .. <code>Sat</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%b{% endhighlight %}</td>
+  <td>Abbreviated month name (<code>Jan</code> .. <code>Dec</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%c{% endhighlight %}</td>
+  <td>Month, numeric (<code>1</code> .. <code>12</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%D{% endhighlight %}</td>
+  <td>Day of the month with English suffix (<code>0th</code>, <code>1st</code>, <code>2nd</code>, <code>3rd</code>, ...)</td>
+  </tr>
+  <tr><td>{% highlight text %}%d{% endhighlight %}</td>
+  <td>Day of the month, numeric (<code>01</code> .. <code>31</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%e{% endhighlight %}</td>
+  <td>Day of the month, numeric (<code>1</code> .. <code>31</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%f{% endhighlight %}</td>
+  <td>Fraction of second (6 digits for printing: <code>000000</code> .. <code>999000</code>; 1 - 9 digits for parsing: <code>0</code> .. <code>999999999</code>) (Timestamp is truncated to milliseconds.) </td>
+  </tr>
+  <tr><td>{% highlight text %}%H{% endhighlight %}</td>
+  <td>Hour (<code>00</code> .. <code>23</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%h{% endhighlight %}</td>
+  <td>Hour (<code>01</code> .. <code>12</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%I{% endhighlight %}</td>
+  <td>Hour (<code>01</code> .. <code>12</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%i{% endhighlight %}</td>
+  <td>Minutes, numeric (<code>00</code> .. <code>59</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%j{% endhighlight %}</td>
+  <td>Day of year (<code>001</code> .. <code>366</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%k{% endhighlight %}</td>
+  <td>Hour (<code>0</code> .. <code>23</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%l{% endhighlight %}</td>
+  <td>Hour (<code>1</code> .. <code>12</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%M{% endhighlight %}</td>
+  <td>Month name (<code>January</code> .. <code>December</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%m{% endhighlight %}</td>
+  <td>Month, numeric (<code>01</code> .. <code>12</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%p{% endhighlight %}</td>
+  <td><code>AM</code> or <code>PM</code></td>
+  </tr>
+  <tr><td>{% highlight text %}%r{% endhighlight %}</td>
+  <td>Time, 12-hour (<code>hh:mm:ss</code> followed by <code>AM</code> or <code>PM</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%S{% endhighlight %}</td>
+  <td>Seconds (<code>00</code> .. <code>59</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%s{% endhighlight %}</td>
+  <td>Seconds (<code>00</code> .. <code>59</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%T{% endhighlight %}</td>
+  <td>Time, 24-hour (<code>hh:mm:ss</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%U{% endhighlight %}</td>
+  <td>Week (<code>00</code> .. <code>53</code>), where Sunday is the first day of the week</td>
+  </tr>
+  <tr><td>{% highlight text %}%u{% endhighlight %}</td>
+  <td>Week (<code>00</code> .. <code>53</code>), where Monday is the first day of the week</td>
+  </tr>
+  <tr><td>{% highlight text %}%V{% endhighlight %}</td>
+  <td>Week (<code>01</code> .. <code>53</code>), where Sunday is the first day of the week; used with <code>%X</code></td>
+  </tr>
+  <tr><td>{% highlight text %}%v{% endhighlight %}</td>
+  <td>Week (<code>01</code> .. <code>53</code>), where Monday is the first day of the week; used with <code>%x</code></td>
+  </tr>
+  <tr><td>{% highlight text %}%W{% endhighlight %}</td>
+  <td>Weekday name (<code>Sunday</code> .. <code>Saturday</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%W{% endhighlight %}</td>
+  <td>Weekday name (<code>Sunday</code> .. <code>Saturday</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%w{% endhighlight %}</td>
+  <td>Day of the week (<code>0</code> .. <code>6</code>), where Sunday is the first day of the week</td>
+  </tr>
+  <tr><td>{% highlight text %}%X{% endhighlight %}</td>
+  <td>Year for the week where Sunday is the first day of the week, numeric, four digits; used with <code>%V</code></td>
+  </tr>
+  <tr><td>{% highlight text %}%x{% endhighlight %}</td>
+  <td>Year for the week, where Monday is the first day of the week, numeric, four digits; used with <code>%v</code></td>
+  </tr>
+  <tr><td>{% highlight text %}%Y{% endhighlight %}</td>
+  <td>Year, numeric, four digits</td>
+  </tr>
+  <tr><td>{% highlight text %}%y{% endhighlight %}</td>
+  <td>Year, numeric (two digits) </td>
+  </tr>
+  <tr><td>{% highlight text %}%%{% endhighlight %}</td>
+  <td>A literal <code>%</code> character</td>
+  </tr>
+  <tr><td>{% highlight text %}%x{% endhighlight %}</td>
+  <td><code>x</code>, for any <code>x</code> not listed above</td>
+  </tr>
   </tbody>
 </table>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2ac8476c/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index 53b8dbb..8a7e3ac 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -110,6 +110,11 @@ under the License.
             <scope>compile</scope>
         </dependency>
 
+		<dependency>
+			<groupId>joda-time</groupId>
+			<artifactId>joda-time</artifactId>
+		</dependency>
+
 		<!-- test dependencies -->
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/2ac8476c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 045fbdd..e8bcdcf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -2008,6 +2008,31 @@ class CodeGenerator(
   }
 
   /**
+    * Adds a reusable DateFormatter to the member area of the generated [[Function]].
+    *
+    * @return member variable term
+    */
+  def addReusableDateFormatter(format: GeneratedExpression): String = {
+    val fieldTerm = newName("dateFormatter")
+
+    val field =
+      s"""
+         |transient org.joda.time.format.DateTimeFormatter $fieldTerm;
+         |""".stripMargin
+    reusableMemberStatements.add(field)
+
+    val fieldInit =
+      s"""
+         |${format.code}
+         |$fieldTerm = org.apache.flink.table.runtime.functions.
+         |DateTimeFunctions$$.MODULE$$.createDateTimeFormatter(${format.resultTerm});
+         |""".stripMargin
+
+    reusableInitStatements.add(fieldInit)
+    fieldTerm
+  }
+
+  /**
     * Adds a reusable [[UserDefinedFunction]] to the member area of the generated [[Function]].
     *
     * @param function [[UserDefinedFunction]] object to be instantiated during runtime

http://git-wip-us.apache.org/repos/asf/flink/blob/2ac8476c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/DateFormatCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/DateFormatCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/DateFormatCallGen.scala
new file mode 100644
index 0000000..ba1e294
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/DateFormatCallGen.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
+import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
+
+class DateFormatCallGen extends CallGenerator {
+  override def generate(codeGenerator: CodeGenerator,
+                        operands: Seq[GeneratedExpression])
+  : GeneratedExpression = {
+
+    if (operands.last.literal) {
+      val formatter = codeGenerator.addReusableDateFormatter(operands.last)
+      generateCallIfArgsNotNull(codeGenerator.nullCheck, STRING_TYPE_INFO, operands) {
+        terms => s"$formatter.print(${terms.head})"
+      }
+    } else {
+      generateCallIfArgsNotNull(codeGenerator.nullCheck, STRING_TYPE_INFO, operands) {
+        terms => s"""
+          |org.apache.flink.table.runtime.functions.
+          |DateTimeFunctions$$.MODULE$$.dateFormat(${terms.head}, ${terms.last});
+          """.stripMargin
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ac8476c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 4da5514..d071279 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -28,8 +28,10 @@ import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.table.functions.sql.DateTimeSqlFunction
 import org.apache.flink.table.functions.utils.{ScalarSqlFunction, TableSqlFunction}
 import org.apache.flink.table.functions.sql.ScalarSqlFunctions._
+
 import scala.collection.mutable
 
 /**
@@ -482,6 +484,12 @@ object FunctionGenerator {
     Seq(),
     new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
 
+  addSqlFunction(
+    DateTimeSqlFunction.DATE_FORMAT,
+    Seq(SqlTimeTypeInfo.TIMESTAMP, STRING_TYPE_INFO),
+    new DateFormatCallGen
+  )
+
   // ----------------------------------------------------------------------------------------------
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2ac8476c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
index f09e2ad..250ec0a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
@@ -28,9 +28,11 @@ import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.table.calcite.FlinkRelBuilder
 import org.apache.flink.table.expressions.ExpressionUtils.{divide, getFactor, mod}
 import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
+import org.apache.flink.table.functions.sql.DateTimeSqlFunction
+import org.apache.flink.table.runtime.functions.DateTimeFunctions
 import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
 import org.apache.flink.table.typeutils.{TimeIntervalTypeInfo, TypeCheckUtils}
-import org.apache.flink.table.validate.{ValidationResult, ValidationFailure, ValidationSuccess}
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
 
 import scala.collection.JavaConversions._
 
@@ -375,3 +377,13 @@ case class TemporalOverlaps(
   }
 }
 
+case class DateFormat(timestamp: Expression, format: Expression) extends Expression {
+  override private[flink] def children = timestamp :: format :: Nil
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder) =
+    relBuilder.call(DateTimeSqlFunction.DATE_FORMAT, timestamp.toRexNode, format.toRexNode)
+
+  override def toString: String = s"$timestamp.dateFormat($format)"
+
+  override private[flink] def resultType = STRING_TYPE_INFO
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ac8476c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/DateTimeSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/DateTimeSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/DateTimeSqlFunction.scala
new file mode 100644
index 0000000..22de126
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/DateTimeSqlFunction.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.sql
+
+import org.apache.calcite.sql.`type`._
+import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
+
+
+object DateTimeSqlFunction {
+  val DATE_FORMAT = new SqlFunction(
+    "DATE_FORMAT",
+    SqlKind.OTHER_FUNCTION,
+    ReturnTypes.explicit(SqlTypeName.VARCHAR),
+    InferTypes.RETURN_TYPE,
+    OperandTypes.sequence("'(TIMESTAMP, FORMAT)'", OperandTypes.DATETIME, OperandTypes.STRING),
+    SqlFunctionCategory.TIMEDATE
+  )
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ac8476c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala
new file mode 100644
index 0000000..d69a5c9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.functions
+
+import org.joda.time.format.DateTimeFormatter
+import org.joda.time.format.DateTimeFormatterBuilder
+
+object DateTimeFunctions {
+  private val PIVOT_YEAR = 2020
+
+  private val DATETIME_FORMATTER_CACHE = new ThreadLocalCache[String, DateTimeFormatter](64) {
+    protected override def getNewInstance(format: String): DateTimeFormatter
+    = createDateTimeFormatter(format)
+  }
+
+  def dateFormat(ts: Long, formatString: String): String = {
+    val formatter = DATETIME_FORMATTER_CACHE.get(formatString)
+    formatter.print(ts)
+  }
+
+  def createDateTimeFormatter(format: String): DateTimeFormatter = {
+    val builder = new DateTimeFormatterBuilder
+    var escaped = false
+    var i = 0
+    while (i < format.length) {
+      val character = format.charAt(i)
+      i = i + 1
+      if (escaped) {
+        character match {
+          // %a Abbreviated weekday name (Sun..Sat)
+          case 'a' => builder.appendDayOfWeekShortText
+          // %b Abbreviated month name (Jan..Dec)
+          case 'b' => builder.appendMonthOfYearShortText
+          // %c Month, numeric (0..12)
+          case 'c' => builder.appendMonthOfYear(1)
+          // %d Day of the month, numeric (00..31)
+          case 'd' => builder.appendDayOfMonth(2)
+          // %e Day of the month, numeric (0..31)
+          case 'e' => builder.appendDayOfMonth(1)
+          // %f Microseconds (000000..999999)
+          case 'f' => builder.appendFractionOfSecond(6, 9)
+          // %H Hour (00..23)
+          case 'H' => builder.appendHourOfDay(2)
+          case 'h' | 'I' => // %h Hour (01..12)
+            builder.appendClockhourOfHalfday(2)
+          // %i Minutes, numeric (00..59)
+          case 'i' => builder.appendMinuteOfHour(2)
+          // %j Day of year (001..366)
+          case 'j' => builder.appendDayOfYear(3)
+          // %k Hour (0..23)
+          case 'k' => builder.appendHourOfDay(1)
+          // %l Hour (1..12)
+          case 'l' => builder.appendClockhourOfHalfday(1)
+          // %M Month name (January..December)
+          case 'M' => builder.appendMonthOfYearText
+          // %m Month, numeric (00..12)
+          case 'm' => builder.appendMonthOfYear(2)
+          // %p AM or PM
+          case 'p' => builder.appendHalfdayOfDayText
+          // %r Time, 12-hour (hh:mm:ss followed by AM or PM)
+          case 'r' => builder.appendClockhourOfHalfday(2).appendLiteral(':').
+            appendMinuteOfHour(2).appendLiteral(':').appendSecondOfMinute(2).
+            appendLiteral(' ').appendHalfdayOfDayText
+          // %S Seconds (00..59)
+          case 'S' | 's' => builder.appendSecondOfMinute(2)
+          // %T Time, 24-hour (hh:mm:ss)
+          case 'T' => builder.appendHourOfDay(2).appendLiteral(':').
+            appendMinuteOfHour(2).appendLiteral(':').appendSecondOfMinute(2)
+          // %v Week (01..53), where Monday is the first day of the week; used with %x
+          case 'v' => builder.appendWeekOfWeekyear(2)
+          // %x Year for the week, where Monday is the first day of the week, numeric,
+          // four digits; used with %v
+          case 'x' => builder.appendWeekyear(4, 4)
+          // %W Weekday name (Sunday..Saturday)
+          case 'W' => builder.appendDayOfWeekText
+          // %Y Year, numeric, four digits
+          case 'Y' => builder.appendYear(4, 4)
+          // %y Year, numeric (two digits)
+          case 'y' => builder.appendTwoDigitYear(PIVOT_YEAR)
+
+          // %w Day of the week (0=Sunday..6=Saturday)
+          // %U Week (00..53), where Sunday is the first day of the week
+          // %u Week (00..53), where Monday is the first day of the week
+          // %V Week (01..53), where Sunday is the first day of the week; used with %X
+          // %X Year for the week where Sunday is the first day of the
+          // week, numeric, four digits; used with %V
+          // %D Day of the month with English suffix (0th, 1st, 2nd, 3rd, ...)
+          case 'w' | 'U' | 'u' | 'V' | 'X' | 'D' =>
+            throw new UnsupportedOperationException(
+              s"%%$character not supported in date format string")
+          // %% A literal "%" character
+          case '%' => builder.appendLiteral('%')
+          // %<x> The literal character represented by <x>
+          case _ => builder.appendLiteral(character)
+        }
+        escaped = false
+      }
+      else if (character == '%') { escaped = true }
+      else { builder.appendLiteral(character) }
+    }
+    builder.toFormatter
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ac8476c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala
new file mode 100644
index 0000000..b3a8d7a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.functions
+
+import java.util.{LinkedHashMap => JLinkedHashMap}
+import java.util.{Map => JMap}
+
+/**
+  * Provides a ThreadLocal cache with a maximum cache size per thread.
+  * Values must not be null.
+  */
+abstract class ThreadLocalCache[K, V](val maxSizePerThread: Int) {
+  private val cache = new ThreadLocal[BoundedMap[K, V]]
+
+  protected def getNewInstance(key: K): V
+
+  def get(key: K): V = {
+    var m = cache.get
+    if (m == null) {
+      m = new BoundedMap(maxSizePerThread)
+      cache.set(m)
+    }
+    var v = m.get(key)
+    if (v == null) {
+      v = getNewInstance(key)
+      m.put(key, v)
+    }
+    v
+  }
+}
+
+private class BoundedMap[K, V](val maxSize: Int) extends JLinkedHashMap[K,V] {
+  override protected def removeEldestEntry(eldest: JMap.Entry[K, V]): Boolean = size > maxSize
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ac8476c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index b7bfa9b..b165ecb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTabl
 import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable}
 import org.apache.flink.table.api._
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions.sql.ScalarSqlFunctions
+import org.apache.flink.table.functions.sql.{DateTimeSqlFunction, ScalarSqlFunctions}
 import org.apache.flink.table.functions.utils.{AggSqlFunction, ScalarSqlFunction, TableSqlFunction}
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
 
@@ -239,6 +239,7 @@ object FunctionCatalog {
     "quarter" -> classOf[Quarter],
     "temporalOverlaps" -> classOf[TemporalOverlaps],
     "dateTimePlus" -> classOf[Plus],
+    "dateFormat" -> classOf[DateFormat],
 
     // array
     "array" -> classOf[ArrayConstructor],
@@ -376,6 +377,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     SqlStdOperatorTable.CURRENT_TIME,
     SqlStdOperatorTable.CURRENT_TIMESTAMP,
     SqlStdOperatorTable.CURRENT_DATE,
+    DateTimeSqlFunction.DATE_FORMAT,
     SqlStdOperatorTable.CAST,
     SqlStdOperatorTable.EXTRACT,
     SqlStdOperatorTable.QUARTER,

http://git-wip-us.apache.org/repos/asf/flink/blob/2ac8476c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala
new file mode 100644
index 0000000..5eb3417
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.ExpressionTestBase
+import org.apache.flink.types.Row
+import org.joda.time.{DateTime, DateTimeZone}
+import org.junit.Test
+
+class DateTimeFunctionTest extends ExpressionTestBase {
+  private val INSTANT = DateTime.parse("1990-01-02T03:04:05.678Z")
+  private val LOCAL_ZONE = DateTimeZone.getDefault
+  private val LOCAL_TIME = INSTANT.toDateTime(LOCAL_ZONE)
+
+  @Test
+  def testDateFormat(): Unit = {
+    val expected = LOCAL_TIME.toString("MM/dd/yyyy HH:mm:ss.SSSSSS")
+    testAllApis(
+      DateFormat('f0, "%m/%d/%Y %H:%i:%s.%f"),
+      "dateFormat(f0, \"%m/%d/%Y %H:%i:%s.%f\")",
+      "DATE_FORMAT(f0, '%m/%d/%Y %H:%i:%s.%f')",
+      expected)
+  }
+
+  @Test
+  def testDateFormatNonConstantFormatter(): Unit = {
+    val expected = LOCAL_TIME.toString("MM/dd/yyyy")
+    testAllApis(
+      DateFormat('f0, 'f1),
+      "dateFormat(f0, f1)",
+      "DATE_FORMAT(f0, f1)",
+      expected)
+  }
+
+  override def testData: Any = {
+    val testData = new Row(2)
+    // SQL expect a timestamp in the local timezone
+    testData.setField(0, new Timestamp(LOCAL_ZONE.convertLocalToUTC(INSTANT.getMillis, true)))
+    testData.setField(1, "%m/%d/%Y")
+    testData
+  }
+
+  override def typeInfo: TypeInformation[Any] =
+    new RowTypeInfo(Types.SQL_TIMESTAMP, Types.STRING).asInstanceOf[TypeInformation[Any]]
+}


[2/4] flink git commit: [FLINK-6693] [table] Support dateFormat() function in Scala Table API.

Posted by fh...@apache.org.
[FLINK-6693] [table] Support dateFormat() function in Scala Table API.

- Add dateFormat() to Table API documentation.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3dbbeedb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3dbbeedb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3dbbeedb

Branch: refs/heads/master
Commit: 3dbbeedb961f19b5ac4fe2d1d28ebb77af986d31
Parents: 2ac8476
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Jul 12 16:45:36 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jul 13 18:54:37 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/sql.md                           | 234 ++++++++++---------
 docs/dev/table/tableApi.md                      |  28 ++-
 .../flink/table/api/scala/expressionDsl.scala   |  29 +++
 .../expressions/DateTimeFunctionTest.scala      |   4 +-
 4 files changed, 174 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3dbbeedb/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 467732a..81aa3d9 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1915,126 +1915,13 @@ DATE_FORMAT(timestamp, format)
 {% endhighlight %}
       </td>
       <td>
-        <p>Formats timestamp as a string using format. The format is compatible with the one used by <code>date_parse</code> and <code>str_to_date</code> in MySQL. The following table describes the specified. </p>  
+        <p>Formats <code>timestamp</code> as a string using a specified <code>format</code>. The format must be compatible with MySQL's date formatting syntax as used by the <code>date_parse</code> function. The format specification is given in the <a href="#date-format-specifier">Date Format Specifier table</a> below.</p>
+        <p>For example <code>DATE_FORMAT(ts, '%Y, %d %M')</code> results in strings formatted as <code>"2017, 05 May"</code>.</p>
       </td>
     </tr>
   </tbody>
 </table>
 
-#### MySQL date format specifier
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 40%">Specifier</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-  <tr><td>{% highlight text %}%a{% endhighlight %}</td>
-  <td>Abbreviated weekday name (<code>Sun</code> .. <code>Sat</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%b{% endhighlight %}</td>
-  <td>Abbreviated month name (<code>Jan</code> .. <code>Dec</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%c{% endhighlight %}</td>
-  <td>Month, numeric (<code>1</code> .. <code>12</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%D{% endhighlight %}</td>
-  <td>Day of the month with English suffix (<code>0th</code>, <code>1st</code>, <code>2nd</code>, <code>3rd</code>, ...)</td>
-  </tr>
-  <tr><td>{% highlight text %}%d{% endhighlight %}</td>
-  <td>Day of the month, numeric (<code>01</code> .. <code>31</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%e{% endhighlight %}</td>
-  <td>Day of the month, numeric (<code>1</code> .. <code>31</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%f{% endhighlight %}</td>
-  <td>Fraction of second (6 digits for printing: <code>000000</code> .. <code>999000</code>; 1 - 9 digits for parsing: <code>0</code> .. <code>999999999</code>) (Timestamp is truncated to milliseconds.) </td>
-  </tr>
-  <tr><td>{% highlight text %}%H{% endhighlight %}</td>
-  <td>Hour (<code>00</code> .. <code>23</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%h{% endhighlight %}</td>
-  <td>Hour (<code>01</code> .. <code>12</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%I{% endhighlight %}</td>
-  <td>Hour (<code>01</code> .. <code>12</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%i{% endhighlight %}</td>
-  <td>Minutes, numeric (<code>00</code> .. <code>59</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%j{% endhighlight %}</td>
-  <td>Day of year (<code>001</code> .. <code>366</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%k{% endhighlight %}</td>
-  <td>Hour (<code>0</code> .. <code>23</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%l{% endhighlight %}</td>
-  <td>Hour (<code>1</code> .. <code>12</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%M{% endhighlight %}</td>
-  <td>Month name (<code>January</code> .. <code>December</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%m{% endhighlight %}</td>
-  <td>Month, numeric (<code>01</code> .. <code>12</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%p{% endhighlight %}</td>
-  <td><code>AM</code> or <code>PM</code></td>
-  </tr>
-  <tr><td>{% highlight text %}%r{% endhighlight %}</td>
-  <td>Time, 12-hour (<code>hh:mm:ss</code> followed by <code>AM</code> or <code>PM</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%S{% endhighlight %}</td>
-  <td>Seconds (<code>00</code> .. <code>59</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%s{% endhighlight %}</td>
-  <td>Seconds (<code>00</code> .. <code>59</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%T{% endhighlight %}</td>
-  <td>Time, 24-hour (<code>hh:mm:ss</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%U{% endhighlight %}</td>
-  <td>Week (<code>00</code> .. <code>53</code>), where Sunday is the first day of the week</td>
-  </tr>
-  <tr><td>{% highlight text %}%u{% endhighlight %}</td>
-  <td>Week (<code>00</code> .. <code>53</code>), where Monday is the first day of the week</td>
-  </tr>
-  <tr><td>{% highlight text %}%V{% endhighlight %}</td>
-  <td>Week (<code>01</code> .. <code>53</code>), where Sunday is the first day of the week; used with <code>%X</code></td>
-  </tr>
-  <tr><td>{% highlight text %}%v{% endhighlight %}</td>
-  <td>Week (<code>01</code> .. <code>53</code>), where Monday is the first day of the week; used with <code>%x</code></td>
-  </tr>
-  <tr><td>{% highlight text %}%W{% endhighlight %}</td>
-  <td>Weekday name (<code>Sunday</code> .. <code>Saturday</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%W{% endhighlight %}</td>
-  <td>Weekday name (<code>Sunday</code> .. <code>Saturday</code>)</td>
-  </tr>
-  <tr><td>{% highlight text %}%w{% endhighlight %}</td>
-  <td>Day of the week (<code>0</code> .. <code>6</code>), where Sunday is the first day of the week</td>
-  </tr>
-  <tr><td>{% highlight text %}%X{% endhighlight %}</td>
-  <td>Year for the week where Sunday is the first day of the week, numeric, four digits; used with <code>%V</code></td>
-  </tr>
-  <tr><td>{% highlight text %}%x{% endhighlight %}</td>
-  <td>Year for the week, where Monday is the first day of the week, numeric, four digits; used with <code>%v</code></td>
-  </tr>
-  <tr><td>{% highlight text %}%Y{% endhighlight %}</td>
-  <td>Year, numeric, four digits</td>
-  </tr>
-  <tr><td>{% highlight text %}%y{% endhighlight %}</td>
-  <td>Year, numeric (two digits) </td>
-  </tr>
-  <tr><td>{% highlight text %}%%{% endhighlight %}</td>
-  <td>A literal <code>%</code> character</td>
-  </tr>
-  <tr><td>{% highlight text %}%x{% endhighlight %}</td>
-  <td><code>x</code>, for any <code>x</code> not listed above</td>
-  </tr>
-  </tbody>
-</table>
-
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -2287,4 +2174,121 @@ A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, A
 
 {% endhighlight %}
 
+#### Date Format Specifier
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 40%">Specifier</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+  <tr><td>{% highlight text %}%a{% endhighlight %}</td>
+  <td>Abbreviated weekday name (<code>Sun</code> .. <code>Sat</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%b{% endhighlight %}</td>
+  <td>Abbreviated month name (<code>Jan</code> .. <code>Dec</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%c{% endhighlight %}</td>
+  <td>Month, numeric (<code>1</code> .. <code>12</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%D{% endhighlight %}</td>
+  <td>Day of the month with English suffix (<code>0th</code>, <code>1st</code>, <code>2nd</code>, <code>3rd</code>, ...)</td>
+  </tr>
+  <tr><td>{% highlight text %}%d{% endhighlight %}</td>
+  <td>Day of the month, numeric (<code>01</code> .. <code>31</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%e{% endhighlight %}</td>
+  <td>Day of the month, numeric (<code>1</code> .. <code>31</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%f{% endhighlight %}</td>
+  <td>Fraction of second (6 digits for printing: <code>000000</code> .. <code>999000</code>; 1 - 9 digits for parsing: <code>0</code> .. <code>999999999</code>) (Timestamp is truncated to milliseconds.) </td>
+  </tr>
+  <tr><td>{% highlight text %}%H{% endhighlight %}</td>
+  <td>Hour (<code>00</code> .. <code>23</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%h{% endhighlight %}</td>
+  <td>Hour (<code>01</code> .. <code>12</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%I{% endhighlight %}</td>
+  <td>Hour (<code>01</code> .. <code>12</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%i{% endhighlight %}</td>
+  <td>Minutes, numeric (<code>00</code> .. <code>59</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%j{% endhighlight %}</td>
+  <td>Day of year (<code>001</code> .. <code>366</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%k{% endhighlight %}</td>
+  <td>Hour (<code>0</code> .. <code>23</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%l{% endhighlight %}</td>
+  <td>Hour (<code>1</code> .. <code>12</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%M{% endhighlight %}</td>
+  <td>Month name (<code>January</code> .. <code>December</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%m{% endhighlight %}</td>
+  <td>Month, numeric (<code>01</code> .. <code>12</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%p{% endhighlight %}</td>
+  <td><code>AM</code> or <code>PM</code></td>
+  </tr>
+  <tr><td>{% highlight text %}%r{% endhighlight %}</td>
+  <td>Time, 12-hour (<code>hh:mm:ss</code> followed by <code>AM</code> or <code>PM</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%S{% endhighlight %}</td>
+  <td>Seconds (<code>00</code> .. <code>59</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%s{% endhighlight %}</td>
+  <td>Seconds (<code>00</code> .. <code>59</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%T{% endhighlight %}</td>
+  <td>Time, 24-hour (<code>hh:mm:ss</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%U{% endhighlight %}</td>
+  <td>Week (<code>00</code> .. <code>53</code>), where Sunday is the first day of the week</td>
+  </tr>
+  <tr><td>{% highlight text %}%u{% endhighlight %}</td>
+  <td>Week (<code>00</code> .. <code>53</code>), where Monday is the first day of the week</td>
+  </tr>
+  <tr><td>{% highlight text %}%V{% endhighlight %}</td>
+  <td>Week (<code>01</code> .. <code>53</code>), where Sunday is the first day of the week; used with <code>%X</code></td>
+  </tr>
+  <tr><td>{% highlight text %}%v{% endhighlight %}</td>
+  <td>Week (<code>01</code> .. <code>53</code>), where Monday is the first day of the week; used with <code>%x</code></td>
+  </tr>
+  <tr><td>{% highlight text %}%W{% endhighlight %}</td>
+  <td>Weekday name (<code>Sunday</code> .. <code>Saturday</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%W{% endhighlight %}</td>
+  <td>Weekday name (<code>Sunday</code> .. <code>Saturday</code>)</td>
+  </tr>
+  <tr><td>{% highlight text %}%w{% endhighlight %}</td>
+  <td>Day of the week (<code>0</code> .. <code>6</code>), where Sunday is the first day of the week</td>
+  </tr>
+  <tr><td>{% highlight text %}%X{% endhighlight %}</td>
+  <td>Year for the week where Sunday is the first day of the week, numeric, four digits; used with <code>%V</code></td>
+  </tr>
+  <tr><td>{% highlight text %}%x{% endhighlight %}</td>
+  <td>Year for the week, where Monday is the first day of the week, numeric, four digits; used with <code>%v</code></td>
+  </tr>
+  <tr><td>{% highlight text %}%Y{% endhighlight %}</td>
+  <td>Year, numeric, four digits</td>
+  </tr>
+  <tr><td>{% highlight text %}%y{% endhighlight %}</td>
+  <td>Year, numeric (two digits) </td>
+  </tr>
+  <tr><td>{% highlight text %}%%{% endhighlight %}</td>
+  <td>A literal <code>%</code> character</td>
+  </tr>
+  <tr><td>{% highlight text %}%x{% endhighlight %}</td>
+  <td><code>x</code>, for any <code>x</code> not listed above</td>
+  </tr>
+  </tbody>
+</table>
+
 {% top %}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/3dbbeedb/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index fe057cc..5ac49d4 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1466,8 +1466,6 @@ Composite types, however, are fully supported types where fields of a composite
 
 Array types can be accessed using the `myArray.at(1)` operator in Table API and `myArray[1]` operator in SQL. Array literals can be created using `array(1, 2, 3)` in Table API and `ARRAY[1, 2, 3]` in SQL.
 
-**TODO: Clean-up and move relevant parts to the "Mappings Types to Table Schema" section of the Common Concepts & API page.**
-
 {% top %}
 
 Expression Syntax
@@ -1542,8 +1540,6 @@ In order to work with temporal values the Table API supports Java SQL's Date, Ti
 
 Temporal intervals can be represented as number of months (`Types.INTERVAL_MONTHS`) or number of milliseconds (`Types.INTERVAL_MILLIS`). Intervals of same type can be added or subtracted (e.g. `1.hour + 10.minutes`). Intervals of milliseconds can be added to time points (e.g. `"2016-08-10".toDate + 5.days`).
 
-**TODO: needs to be reworked, IMO. Grammar might be complete but is hard to understand without concrete examples**
-
 {% top %}
 
 Built-In Functions
@@ -2593,6 +2589,18 @@ temporalOverlaps(TIMEPOINT, TEMPORAL, TIMEPOINT, TEMPORAL)
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight java %}
+dateFormat(TIMESTAMP, STRING)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Formats <code>timestamp</code> as a string using a specified <code>format</code>. The format must be compatible with MySQL's date formatting syntax as used by the <code>date_parse</code> function. The format specification is given in the <a href="sql.html#date-format-specifier">Date Format Specifier table</a> below.</p>
+        <p>For example <code>dateFormat(ts, '%Y, %d %M')</code> results in strings formatted as <code>"2017, 05 May"</code>.</p>
+      </td>
+    </tr>
+
     </tbody>
 </table>
 
@@ -4013,6 +4021,18 @@ ANY.flatten()
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight scala %}
+dateFormat(TIMESTAMP, STRING)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Formats <code>timestamp</code> as a string using a specified <code>format</code>. The format must be compatible with MySQL's date formatting syntax as used by the <code>date_parse</code> function. The format specification is given in the <a href="sql.html#date-format-specifier">Date Format Specifier table</a> below.</p>
+        <p>For example <code>dateFormat('ts, "%Y, %d %M")</code> results in strings formatted as <code>"2017, 05 May"</code>.</p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3dbbeedb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 563dc31..13c03d7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -898,6 +898,35 @@ object temporalOverlaps {
 }
 
 /**
+  * Formats a timestamp as a string using a specified format.
+  * The format must be compatible with MySQL's date formatting syntax as used by the
+  * date_parse function.
+  *
+  * For example <code>dataFormat('time, "%Y, %d %M")</code> results in strings
+  * formatted as "2017, 05 May".
+  */
+object dateFormat {
+
+  /**
+    * Formats a timestamp as a string using a specified format.
+    * The format must be compatible with MySQL's date formatting syntax as used by the
+    * date_parse function.
+    *
+    * For example dataFormat('time, "%Y, %d %M") results in strings formatted as "2017, 05 May".
+    *
+    * @param timestamp The timestamp to format as string.
+    * @param format The format of the string.
+    * @return The formatted timestamp as string.
+    */
+  def apply(
+    timestamp: Expression,
+    format: Expression
+  ): Expression = {
+    DateFormat(timestamp, format)
+  }
+}
+
+/**
   * Creates an array of literals. The array will be an array of objects (not primitives).
   */
 object array {

http://git-wip-us.apache.org/repos/asf/flink/blob/3dbbeedb/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala
index 5eb3417..a043543 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala
@@ -37,7 +37,7 @@ class DateTimeFunctionTest extends ExpressionTestBase {
   def testDateFormat(): Unit = {
     val expected = LOCAL_TIME.toString("MM/dd/yyyy HH:mm:ss.SSSSSS")
     testAllApis(
-      DateFormat('f0, "%m/%d/%Y %H:%i:%s.%f"),
+      dateFormat('f0, "%m/%d/%Y %H:%i:%s.%f"),
       "dateFormat(f0, \"%m/%d/%Y %H:%i:%s.%f\")",
       "DATE_FORMAT(f0, '%m/%d/%Y %H:%i:%s.%f')",
       expected)
@@ -47,7 +47,7 @@ class DateTimeFunctionTest extends ExpressionTestBase {
   def testDateFormatNonConstantFormatter(): Unit = {
     val expected = LOCAL_TIME.toString("MM/dd/yyyy")
     testAllApis(
-      DateFormat('f0, 'f1),
+      dateFormat('f0, 'f1),
       "dateFormat(f0, f1)",
       "DATE_FORMAT(f0, f1)",
       expected)


[4/4] flink git commit: [hotfix] [table] Fix typo in DataStreamRel ScalaDocs.

Posted by fh...@apache.org.
[hotfix] [table] Fix typo in DataStreamRel ScalaDocs.

This closes #4303.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a50a9bbe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a50a9bbe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a50a9bbe

Branch: refs/heads/master
Commit: a50a9bbe961652d8fd7fe159c138ea83296ecf05
Parents: 3dbbeed
Author: Ken Geis <ge...@gmail.com>
Authored: Tue Jul 11 12:22:58 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jul 13 18:54:37 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/table/plan/nodes/datastream/DataStreamRel.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a50a9bbe/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index 65d336f..fc5570d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -48,7 +48,7 @@ trait DataStreamRel extends FlinkRelNode {
   def producesUpdates: Boolean = false
 
   /**
-    * Wheter the [[DataStreamRel]] consumes retraction messages instead of forwarding them.
+    * Whether the [[DataStreamRel]] consumes retraction messages instead of forwarding them.
     * The node might or might not produce new retraction messages.
     */
   def consumesRetractions: Boolean = false


[3/4] flink git commit: [FLINK-7154] [docs] Missing call to build CsvTableSource example

Posted by fh...@apache.org.
[FLINK-7154] [docs] Missing call to build CsvTableSource example

The Java and Scala example code for CsvTableSource create a builder but
are missing the final call to build.

This closes #4313.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94d3166b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94d3166b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94d3166b

Branch: refs/heads/master
Commit: 94d3166b474ca4b4270ee87725ee0f9a08c8bd56
Parents: a50a9bb
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Jul 11 14:21:50 2017 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jul 13 18:54:37 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/sourceSinks.md | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/94d3166b/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 58049c8..7af74ca 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -173,7 +173,8 @@ CsvTableSource csvTableSource = CsvTableSource
     .lineDelimiter("$")
     .ignoreFirstLine()
     .ignoreParseErrors()
-    .commentPrefix("%");
+    .commentPrefix("%")
+    .build();
 {% endhighlight %}
 </div>
 
@@ -191,6 +192,7 @@ val csvTableSource = CsvTableSource
     .ignoreFirstLine
     .ignoreParseErrors
     .commentPrefix("%")
+    .build
 {% endhighlight %}
 </div>
 </div>