You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/02 14:31:33 UTC
flink git commit: [FLINK-3497] [table] Add IS (NOT) TRUE/IS (NOT)
FALSE functions
Repository: flink
Updated Branches:
refs/heads/master 78f2a1586 -> dc3337a93
[FLINK-3497] [table] Add IS (NOT) TRUE/IS (NOT) FALSE functions
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc3337a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc3337a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc3337a9
Branch: refs/heads/master
Commit: dc3337a93b20dfc8dc8cdc0ec8c2bc4843e76a69
Parents: 78f2a15
Author: twalthr <tw...@apache.org>
Authored: Fri Sep 2 16:23:04 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Fri Sep 2 16:30:08 2016 +0200
----------------------------------------------------------------------
docs/dev/table_api.md | 94 +++++++++++++++++++-
.../flink/api/scala/table/expressionDsl.scala | 10 +++
.../flink/api/table/codegen/CodeGenerator.scala | 20 +++++
.../table/codegen/calls/ScalarOperators.scala | 32 +++++++
.../flink/api/table/codegen/generated.scala | 6 ++
.../table/expressions/ExpressionParser.scala | 33 +++----
.../api/table/expressions/comparison.scala | 20 +++++
.../api/table/validate/FunctionCatalog.scala | 6 ++
.../table/expressions/ScalarFunctionsTest.scala | 57 +++++++++++-
9 files changed, 253 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 543945c..68a2b95 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -963,7 +963,7 @@ unary = [ "!" | "-" ] , composite ;
composite = suffixed | atom ;
-suffixed = interval | cast | as | aggregation | nullCheck | if | functionCall ;
+suffixed = interval | cast | as | aggregation | if | functionCall ;
interval = composite , "." , ("year" | "month" | "day" | "hour" | "minute" | "second" | "milli") ;
@@ -975,11 +975,9 @@ as = composite , ".as(" , fieldReference , ")" ;
aggregation = composite , ( ".sum" | ".min" | ".max" | ".count" | ".avg" ) , [ "()" ] ;
-nullCheck = composite , ( ".isNull" | ".isNotNull" ) , [ "()" ] ;
-
if = composite , ".?(" , expression , "," , expression , ")" ;
-functionCall = composite , "." , functionIdentifier , "(" , [ expression , { "," , expression } ] , ")" ;
+functionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")" ] ;
atom = ( "(" , expression , ")" ) | literal | nullLiteral | fieldReference ;
@@ -1234,6 +1232,50 @@ Both the Table API and SQL come with a set of built-in scalar functions for data
<tr>
<td>
{% highlight java %}
+ANY.isNull
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns true if the given expression is null.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight java %}
+ANY.isNotNull
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns true if the given expression is not null.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight java %}
+BOOLEAN.isTrue
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns true if the given boolean expression is true. False otherwise (for null and false).</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight java %}
+BOOLEAN.isFalse
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns true if given boolean expression is false. False otherwise (for null and true).</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight java %}
NUMERIC.exp()
{% endhighlight %}
</td>
@@ -1498,6 +1540,50 @@ TIMEPOINT.ceil(TIMEINTERVALUNIT)
<tr>
<td>
{% highlight scala %}
+ANY.isNull
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns true if the given expression is null.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight scala %}
+ANY.isNotNull
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns true if the given expression is not null.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight scala %}
+BOOLEAN.isTrue
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns true if the given boolean expression is true. False otherwise (for null and false).</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight scala %}
+BOOLEAN.isFalse
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns true if given boolean expression is false. False otherwise (for null and true).</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight scala %}
NUMERIC.exp()
{% endhighlight %}
</td>
http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index b14ca88..9bfe6c3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -63,6 +63,16 @@ trait ImplicitExpressionOperations {
def isNull = IsNull(expr)
def isNotNull = IsNotNull(expr)
+ /**
+ * Returns true if given boolean expression is true. False otherwise (for null and false).
+ */
+ def isTrue = IsTrue(expr)
+
+ /**
+ * Returns true if given boolean expression is false. False otherwise (for null and true).
+ */
+ def isFalse = IsFalse(expr)
+
def + (other: Expression) = Plus(expr, other)
def - (other: Expression) = Minus(expr, other)
def / (other: Expression) = Div(expr, other)
http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index 4a3865f..6463ff9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -797,6 +797,26 @@ class CodeGenerator(
case CASE =>
generateIfElse(nullCheck, operands, resultType)
+ case IS_TRUE =>
+ val operand = operands.head
+ requireBoolean(operand)
+ generateIsTrue(operand)
+
+ case IS_NOT_TRUE =>
+ val operand = operands.head
+ requireBoolean(operand)
+ generateIsNotTrue(operand)
+
+ case IS_FALSE =>
+ val operand = operands.head
+ requireBoolean(operand)
+ generateIsFalse(operand)
+
+ case IS_NOT_FALSE =>
+ val operand = operands.head
+ requireBoolean(operand)
+ generateIsNotFalse(operand)
+
// casting
case CAST | REINTERPRET =>
val operand = operands.head
http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
index afe69ed..094a224 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
@@ -384,6 +384,38 @@ object ScalarOperators {
}
}
+ def generateIsTrue(operand: GeneratedExpression): GeneratedExpression = {
+ GeneratedExpression(
+ operand.resultTerm, // unknown is always false by default
+ GeneratedExpression.NEVER_NULL,
+ operand.code,
+ BOOLEAN_TYPE_INFO)
+ }
+
+ def generateIsNotTrue(operand: GeneratedExpression): GeneratedExpression = {
+ GeneratedExpression(
+ s"(!${operand.resultTerm})", // unknown is always false by default
+ GeneratedExpression.NEVER_NULL,
+ operand.code,
+ BOOLEAN_TYPE_INFO)
+ }
+
+ def generateIsFalse(operand: GeneratedExpression): GeneratedExpression = {
+ GeneratedExpression(
+ s"(!${operand.resultTerm} && !${operand.nullTerm})",
+ GeneratedExpression.NEVER_NULL,
+ operand.code,
+ BOOLEAN_TYPE_INFO)
+ }
+
+ def generateIsNotFalse(operand: GeneratedExpression): GeneratedExpression = {
+ GeneratedExpression(
+ s"(${operand.resultTerm} || ${operand.nullTerm})",
+ GeneratedExpression.NEVER_NULL,
+ operand.code,
+ BOOLEAN_TYPE_INFO)
+ }
+
def generateCast(
nullCheck: Boolean,
operand: GeneratedExpression,
http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala
index 26c6696..bb52ad8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala
@@ -34,4 +34,10 @@ case class GeneratedExpression(
code: String,
resultType: TypeInformation[_])
+object GeneratedExpression {
+ val ALWAYS_NULL = "true"
+ val NEVER_NULL = "false"
+ val NO_CODE = ""
+}
+
case class GeneratedFunction[T](name: String, returnType: TypeInformation[Any], code: String)
http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
index cb92573..ae027e9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
@@ -54,8 +54,6 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val MIN: Keyword = Keyword("min")
lazy val MAX: Keyword = Keyword("max")
lazy val SUM: Keyword = Keyword("sum")
- lazy val IS_NULL: Keyword = Keyword("isNull")
- lazy val IS_NOT_NULL: Keyword = Keyword("isNotNull")
lazy val CAST: Keyword = Keyword("cast")
lazy val NULL: Keyword = Keyword("Null")
lazy val IF: Keyword = Keyword("?")
@@ -79,7 +77,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
def functionIdent: ExpressionParser.Parser[String] =
not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
- not(SUM) ~ not(IS_NULL) ~ not(IS_NOT_NULL) ~ not(CAST) ~ not(NULL) ~
+ not(SUM) ~ not(CAST) ~ not(NULL) ~
not(IF) ~> super.ident
// symbols
@@ -169,12 +167,6 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
// suffix operators
- lazy val suffixIsNull: PackratParser[Expression] =
- composite <~ "." ~ IS_NULL ~ opt("()") ^^ { e => IsNull(e) }
-
- lazy val suffixIsNotNull: PackratParser[Expression] =
- composite <~ "." ~ IS_NOT_NULL ~ opt("()") ^^ { e => IsNotNull(e) }
-
lazy val suffixSum: PackratParser[Expression] =
composite <~ "." ~ SUM ~ opt("()") ^^ { e => Sum(e) }
@@ -230,6 +222,10 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args)
}
+ lazy val suffixFunctionCallOneArg = composite ~ "." ~ functionIdent ^^ {
+ case operand ~ _ ~ name => Call(name.toUpperCase, Seq(operand))
+ }
+
lazy val suffixAsc : PackratParser[Expression] =
atom <~ "." ~ ASC ~ opt("()") ^^ { e => Asc(e) }
@@ -264,20 +260,14 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
}
lazy val suffixed: PackratParser[Expression] =
- suffixTimeInterval | suffixIsNull | suffixIsNotNull | suffixSum | suffixMin | suffixMax |
+ suffixTimeInterval | suffixSum | suffixMin | suffixMax |
suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs |
suffixIf | suffixAsc | suffixDesc | suffixToDate | suffixToTimestamp | suffixToTime |
suffixExtract | suffixFloor | suffixCeil |
- suffixFunctionCall // function call must always be at the end
+ suffixFunctionCall | suffixFunctionCallOneArg // function call must always be at the end
// prefix operators
- lazy val prefixIsNull: PackratParser[Expression] =
- IS_NULL ~ "(" ~> expression <~ ")" ^^ { e => IsNull(e) }
-
- lazy val prefixIsNotNull: PackratParser[Expression] =
- IS_NOT_NULL ~ "(" ~> expression <~ ")" ^^ { e => IsNotNull(e) }
-
lazy val prefixSum: PackratParser[Expression] =
SUM ~ "(" ~> expression <~ ")" ^^ { e => Sum(e) }
@@ -312,6 +302,10 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args)
}
+ lazy val prefixFunctionCallOneArg = functionIdent ~ "(" ~ expression ~ ")" ^^ {
+ case name ~ _ ~ arg ~ _ => Call(name.toUpperCase, Seq(arg))
+ }
+
lazy val prefixTrim = TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
case _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ ~ operand ~ _ => Trim(mode, trimCharacter, operand)
}
@@ -333,9 +327,10 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
}
lazy val prefixed: PackratParser[Expression] =
- prefixIsNull | prefixIsNotNull | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg |
+ prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg |
prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixExtract |
- prefixFloor | prefixCeil | prefixFunctionCall // function call must always be at the end
+ prefixFloor | prefixCeil |
+ prefixFunctionCall | prefixFunctionCallOneArg // function call must always be at the end
// suffix/prefix composite
http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
index 0acfbf1..ad01674 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
@@ -121,3 +121,23 @@ case class IsNotNull(child: Expression) extends UnaryExpression {
override private[flink] def resultType = BOOLEAN_TYPE_INFO
}
+
+case class IsTrue(child: Expression) extends UnaryExpression {
+ override def toString = s"($child).isTrue"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(SqlStdOperatorTable.IS_TRUE, child.toRexNode)
+ }
+
+ override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsFalse(child: Expression) extends UnaryExpression {
+ override def toString = s"($child).isFalse"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(SqlStdOperatorTable.IS_FALSE, child.toRexNode)
+ }
+
+ override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
index fb38dde..b9a3f71 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
@@ -117,6 +117,12 @@ class FunctionCatalog {
object FunctionCatalog {
val buildInFunctions: Map[String, Class[_]] = Map(
+ // logic
+ "isNull" -> classOf[IsNull],
+ "isNotNull" -> classOf[IsNotNull],
+ "isTrue" -> classOf[IsTrue],
+ "isFalse" -> classOf[IsFalse],
+
// aggregate functions
"avg" -> classOf[Avg],
"count" -> classOf[Count],
http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
index 7162a04..7ab0c7d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
@@ -702,10 +702,61 @@ class ScalarFunctionsTest extends ExpressionTestBase {
"1996-11-01")
}
+ @Test
+ def testIsTrueIsFalse(): Unit = {
+ testAllApis(
+ 'f1.isTrue,
+ "f1.isTrue",
+ "f1 IS TRUE",
+ "true")
+
+ testAllApis(
+ 'f21.isTrue,
+ "f21.isTrue",
+ "f21 IS TRUE",
+ "false")
+
+ testAllApis(
+ false.isFalse,
+ "false.isFalse",
+ "FALSE IS FALSE",
+ "true")
+
+ testAllApis(
+ 'f21.isFalse,
+ "f21.isFalse",
+ "f21 IS FALSE",
+ "false")
+
+ testAllApis(
+ !'f1.isTrue,
+ "!f1.isTrue",
+ "f1 IS NOT TRUE",
+ "false")
+
+ testAllApis(
+ !'f21.isTrue,
+ "!f21.isTrue",
+ "f21 IS NOT TRUE",
+ "true")
+
+ testAllApis(
+ !false.isFalse,
+ "!false.isFalse",
+ "FALSE IS NOT FALSE",
+ "false")
+
+ testAllApis(
+ !'f21.isFalse,
+ "!f21.isFalse",
+ "f21 IS NOT FALSE",
+ "true")
+ }
+
// ----------------------------------------------------------------------------------------------
def testData = {
- val testData = new Row(21)
+ val testData = new Row(22)
testData.setField(0, "This is a test String.")
testData.setField(1, true)
testData.setField(2, 42.toByte)
@@ -727,6 +778,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
testData.setField(18, Timestamp.valueOf("1996-11-10 06:55:44.333"))
testData.setField(19, 1467012213000L) // +16979 07:23:33.000
testData.setField(20, 25) // +2-01
+ testData.setField(21, null)
testData
}
@@ -752,6 +804,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
Types.TIME,
Types.TIMESTAMP,
Types.INTERVAL_MILLIS,
- Types.INTERVAL_MONTHS)).asInstanceOf[TypeInformation[Any]]
+ Types.INTERVAL_MONTHS,
+ Types.BOOLEAN)).asInstanceOf[TypeInformation[Any]]
}
}