You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/09 15:24:08 UTC
flink git commit: [FLINK-3580] [table] Add QUARTER function
Repository: flink
Updated Branches:
refs/heads/master 8c0d62433 -> 9420a775f
[FLINK-3580] [table] Add QUARTER function
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9420a775
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9420a775
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9420a775
Branch: refs/heads/master
Commit: 9420a775f4689ec57c9026b418c5961c33263265
Parents: 8c0d624
Author: twalthr <tw...@apache.org>
Authored: Fri Sep 9 17:17:44 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Fri Sep 9 17:22:22 2016 +0200
----------------------------------------------------------------------
docs/dev/table_api.md | 28 +++++++++++++++++--
.../flink/api/scala/table/expressionDsl.scala | 7 +++++
.../table/codegen/calls/ScalarFunctions.scala | 6 ++++
.../flink/api/table/expressions/time.scala | 29 ++++++++++++++++++++
.../api/table/validate/FunctionCatalog.scala | 3 +-
.../table/expressions/ScalarFunctionsTest.scala | 21 ++++++++++++++
6 files changed, 90 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9420a775/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 59998f0..b419bfb 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1520,7 +1520,18 @@ TEMPORAL.extract(TIMEINTERVALUNIT)
{% endhighlight %}
</td>
<td>
- <p>Extracts parts of a time point or time interval. Returns the part as a long value. E.g. <code>"2006-06-05".toDate.extract(DAY)</code> leads to 5.</p>
+ <p>Extracts parts of a time point or time interval. Returns the part as a long value. E.g. <code>'2006-06-05'.toDate.extract(DAY)</code> leads to 5.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight java %}
+DATE.quarter()
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns the quarter of a year from a SQL date. E.g. <code>'1994-09-27'.toDate.quarter()</code> leads to 3.</p>
</td>
</tr>
@@ -1531,7 +1542,7 @@ TIMEPOINT.floor(TIMEINTERVALUNIT)
{% endhighlight %}
</td>
<td>
- <p>Rounds a time point down to the given unit. E.g. <code>"12:44:31".toDate.floor(MINUTE)</code> leads to 12:44:00.</p>
+ <p>Rounds a time point down to the given unit. E.g. <code>'12:44:31'.toDate.floor(MINUTE)</code> leads to 12:44:00.</p>
</td>
</tr>
@@ -1542,7 +1553,7 @@ TIMEPOINT.ceil(TIMEINTERVALUNIT)
{% endhighlight %}
</td>
<td>
- <p>Rounds a time point up to the given unit. E.g. <code>"12:44:31".toTime.floor(MINUTE)</code> leads to 12:45:00.</p>
+ <p>Rounds a time point up to the given unit. E.g. <code>'12:44:31'.toTime.floor(MINUTE)</code> leads to 12:45:00.</p>
</td>
</tr>
@@ -1912,6 +1923,17 @@ TEMPORAL.extract(TimeIntervalUnit)
<tr>
<td>
{% highlight scala %}
+DATE.quarter()
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns the quarter of a year from a SQL date. E.g. <code>"1994-09-27".toDate.quarter()</code> leads to 3.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight scala %}
TIMEPOINT.floor(TimeIntervalUnit)
{% endhighlight %}
</td>
http://git-wip-us.apache.org/repos/asf/flink/blob/9420a775/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index 449f9e2..61c6a65 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -277,6 +277,13 @@ trait ImplicitExpressionOperations {
def extract(timeIntervalUnit: TimeIntervalUnit) = Extract(timeIntervalUnit, expr)
/**
+ * Returns the quarter of a year from a SQL date.
+ *
+ * e.g. "1994-09-27".toDate.quarter() leads to 3
+ */
+ def quarter() = Quarter(expr)
+
+ /**
* Rounds down a time point to the given unit.
*
* e.g. "12:44:31".toDate.floor(MINUTE) leads to 12:44:00
http://git-wip-us.apache.org/repos/asf/flink/blob/9420a775/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
index d8bd4c1..90a6bd5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
@@ -199,6 +199,12 @@ object ScalarFunctions {
LONG_TYPE_INFO,
BuiltInMethod.UNIX_DATE_EXTRACT.method)
+ addSqlFunctionMethod(
+ EXTRACT_DATE,
+ Seq(new GenericTypeInfo(classOf[TimeUnitRange]), SqlTimeTypeInfo.DATE),
+ LONG_TYPE_INFO,
+ BuiltInMethod.UNIX_DATE_EXTRACT.method)
+
addSqlFunction(
FLOOR,
Seq(SqlTimeTypeInfo.DATE, new GenericTypeInfo(classOf[TimeUnitRange])),
http://git-wip-us.apache.org/repos/asf/flink/blob/9420a775/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
index 385b3d5..4b1942e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
@@ -97,6 +97,8 @@ case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends E
temporal: RexNode,
relBuilder: FlinkRelBuilder)
: RexNode = {
+
+ // TODO convert this into Table API expressions to make the code more readable
val rexBuilder = relBuilder.getRexBuilder
val resultType = relBuilder.getTypeFactory().createTypeFromTypeInfo(LONG_TYPE_INFO)
var result = rexBuilder.makeReinterpretCast(
@@ -248,3 +250,30 @@ case class LocalTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = tr
case class LocalTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = true)
+/**
+ * Extracts the quarter of a year from a SQL date.
+ */
+case class Quarter(child: Expression) extends UnaryExpression with InputTypeSpec {
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(SqlTimeTypeInfo.DATE)
+
+ override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
+
+ override def toString: String = s"($child).quarter()"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ /**
+ * Standard conversion of the QUARTER operator.
+ * Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#convertQuarter()]]
+ */
+ Plus(
+ Div(
+ Minus(
+ Extract(TimeIntervalUnit.MONTH, child),
+ Literal(1L)),
+ Literal(TimeUnit.QUARTER.multiplier.longValue())),
+ Literal(1L)
+ ).toRexNode
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/9420a775/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
index ef49356..f818e14 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
@@ -158,7 +158,8 @@ object FunctionCatalog {
"currentTime" -> classOf[CurrentTime],
"currentTimestamp" -> classOf[CurrentTimestamp],
"localTime" -> classOf[LocalTime],
- "localTimestamp" -> classOf[LocalTimestamp]
+ "localTimestamp" -> classOf[LocalTimestamp],
+ "quarter" -> classOf[Quarter]
// TODO implement function overloading here
// "floor" -> classOf[TemporalFloor]
http://git-wip-us.apache.org/repos/asf/flink/blob/9420a775/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
index c15751e..c9485fe 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
@@ -828,6 +828,27 @@ class ScalarFunctionsTest extends ExpressionTestBase {
"true")
}
+ @Test
+ def testQuarter(): Unit = {
+ testAllApis(
+ "1997-01-27".toDate.quarter(),
+ "'1997-01-27'.toDate.quarter()",
+ "QUARTER(DATE '1997-01-27')",
+ "1")
+
+ testAllApis(
+ "1997-04-27".toDate.quarter(),
+ "'1997-04-27'.toDate.quarter()",
+ "QUARTER(DATE '1997-04-27')",
+ "2")
+
+ testAllApis(
+ "1997-12-31".toDate.quarter(),
+ "'1997-12-31'.toDate.quarter()",
+ "QUARTER(DATE '1997-12-31')",
+ "4")
+ }
+
// ----------------------------------------------------------------------------------------------
def testData = {