You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/02 14:31:33 UTC

flink git commit: [FLINK-3497] [table] Add IS (NOT) TRUE/IS (NOT) FALSE functions

Repository: flink
Updated Branches:
  refs/heads/master 78f2a1586 -> dc3337a93


[FLINK-3497] [table] Add IS (NOT) TRUE/IS (NOT) FALSE functions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc3337a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc3337a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc3337a9

Branch: refs/heads/master
Commit: dc3337a93b20dfc8dc8cdc0ec8c2bc4843e76a69
Parents: 78f2a15
Author: twalthr <tw...@apache.org>
Authored: Fri Sep 2 16:23:04 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Fri Sep 2 16:30:08 2016 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           | 94 +++++++++++++++++++-
 .../flink/api/scala/table/expressionDsl.scala   | 10 +++
 .../flink/api/table/codegen/CodeGenerator.scala | 20 +++++
 .../table/codegen/calls/ScalarOperators.scala   | 32 +++++++
 .../flink/api/table/codegen/generated.scala     |  6 ++
 .../table/expressions/ExpressionParser.scala    | 33 +++----
 .../api/table/expressions/comparison.scala      | 20 +++++
 .../api/table/validate/FunctionCatalog.scala    |  6 ++
 .../table/expressions/ScalarFunctionsTest.scala | 57 +++++++++++-
 9 files changed, 253 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 543945c..68a2b95 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -963,7 +963,7 @@ unary = [ "!" | "-" ] , composite ;
 
 composite = suffixed | atom ;
 
-suffixed = interval | cast | as | aggregation | nullCheck | if | functionCall ;
+suffixed = interval | cast | as | aggregation | if | functionCall ;
 
 interval = composite , "." , ("year" | "month" | "day" | "hour" | "minute" | "second" | "milli") ;
 
@@ -975,11 +975,9 @@ as = composite , ".as(" , fieldReference , ")" ;
 
 aggregation = composite , ( ".sum" | ".min" | ".max" | ".count" | ".avg" ) , [ "()" ] ;
 
-nullCheck = composite , ( ".isNull" | ".isNotNull" ) , [ "()" ] ;
-
 if = composite , ".?(" , expression , "," , expression , ")" ;
 
-functionCall = composite , "." , functionIdentifier , "(" , [ expression , { "," , expression } ] , ")" ;
+functionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")" ] ;
 
 atom = ( "(" , expression , ")" ) | literal | nullLiteral | fieldReference ;
 
@@ -1234,6 +1232,50 @@ Both the Table API and SQL come with a set of built-in scalar functions for data
     <tr>
       <td>
         {% highlight java %}
+ANY.isNull
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns true if the given expression is null.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+ANY.isNotNull
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns true if the given expression is not null.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+BOOLEAN.isTrue
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns true if the given boolean expression is true. False otherwise (for null and false).</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+BOOLEAN.isFalse
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns true if given boolean expression is false. False otherwise (for null and true).</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
 NUMERIC.exp()
 {% endhighlight %}
       </td>
@@ -1498,6 +1540,50 @@ TIMEPOINT.ceil(TIMEINTERVALUNIT)
     <tr>
       <td>
         {% highlight scala %}
+ANY.isNull
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns true if the given expression is null.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+ANY.isNotNull
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns true if the given expression is not null.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+BOOLEAN.isTrue
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns true if the given boolean expression is true. False otherwise (for null and false).</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+BOOLEAN.isFalse
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns true if given boolean expression is false. False otherwise (for null and true).</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
 NUMERIC.exp()
 {% endhighlight %}
       </td>

http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index b14ca88..9bfe6c3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -63,6 +63,16 @@ trait ImplicitExpressionOperations {
   def isNull = IsNull(expr)
   def isNotNull = IsNotNull(expr)
 
+  /**
+    * Returns true if given boolean expression is true. False otherwise (for null and false).
+    */
+  def isTrue = IsTrue(expr)
+
+  /**
+    * Returns true if given boolean expression is false. False otherwise (for null and true).
+    */
+  def isFalse = IsFalse(expr)
+
   def + (other: Expression) = Plus(expr, other)
   def - (other: Expression) = Minus(expr, other)
   def / (other: Expression) = Div(expr, other)

http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index 4a3865f..6463ff9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -797,6 +797,26 @@ class CodeGenerator(
       case CASE =>
         generateIfElse(nullCheck, operands, resultType)
 
+      case IS_TRUE =>
+        val operand = operands.head
+        requireBoolean(operand)
+        generateIsTrue(operand)
+
+      case IS_NOT_TRUE =>
+        val operand = operands.head
+        requireBoolean(operand)
+        generateIsNotTrue(operand)
+
+      case IS_FALSE =>
+        val operand = operands.head
+        requireBoolean(operand)
+        generateIsFalse(operand)
+
+      case IS_NOT_FALSE =>
+        val operand = operands.head
+        requireBoolean(operand)
+        generateIsNotFalse(operand)
+
       // casting
       case CAST | REINTERPRET =>
         val operand = operands.head

http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
index afe69ed..094a224 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
@@ -384,6 +384,38 @@ object ScalarOperators {
     }
   }
 
+  def generateIsTrue(operand: GeneratedExpression): GeneratedExpression = {
+    GeneratedExpression(
+      operand.resultTerm, // unknown is always false by default
+      GeneratedExpression.NEVER_NULL,
+      operand.code,
+      BOOLEAN_TYPE_INFO)
+  }
+
+  def generateIsNotTrue(operand: GeneratedExpression): GeneratedExpression = {
+    GeneratedExpression(
+      s"(!${operand.resultTerm})", // unknown is always false by default
+      GeneratedExpression.NEVER_NULL,
+      operand.code,
+      BOOLEAN_TYPE_INFO)
+  }
+
+  def generateIsFalse(operand: GeneratedExpression): GeneratedExpression = {
+    GeneratedExpression(
+      s"(!${operand.resultTerm} && !${operand.nullTerm})",
+      GeneratedExpression.NEVER_NULL,
+      operand.code,
+      BOOLEAN_TYPE_INFO)
+  }
+
+  def generateIsNotFalse(operand: GeneratedExpression): GeneratedExpression = {
+    GeneratedExpression(
+      s"(${operand.resultTerm} || ${operand.nullTerm})",
+      GeneratedExpression.NEVER_NULL,
+      operand.code,
+      BOOLEAN_TYPE_INFO)
+  }
+
   def generateCast(
       nullCheck: Boolean,
       operand: GeneratedExpression,

http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala
index 26c6696..bb52ad8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala
@@ -34,4 +34,10 @@ case class GeneratedExpression(
     code: String,
     resultType: TypeInformation[_])
 
+object GeneratedExpression {
+  val ALWAYS_NULL = "true"
+  val NEVER_NULL = "false"
+  val NO_CODE = ""
+}
+
 case class GeneratedFunction[T](name: String, returnType: TypeInformation[Any], code: String)

http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
index cb92573..ae027e9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
@@ -54,8 +54,6 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val MIN: Keyword = Keyword("min")
   lazy val MAX: Keyword = Keyword("max")
   lazy val SUM: Keyword = Keyword("sum")
-  lazy val IS_NULL: Keyword = Keyword("isNull")
-  lazy val IS_NOT_NULL: Keyword = Keyword("isNotNull")
   lazy val CAST: Keyword = Keyword("cast")
   lazy val NULL: Keyword = Keyword("Null")
   lazy val IF: Keyword = Keyword("?")
@@ -79,7 +77,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
 
   def functionIdent: ExpressionParser.Parser[String] =
     not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
-      not(SUM) ~ not(IS_NULL) ~ not(IS_NOT_NULL) ~ not(CAST) ~ not(NULL) ~
+      not(SUM) ~ not(CAST) ~ not(NULL) ~
       not(IF) ~> super.ident
 
   // symbols
@@ -169,12 +167,6 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
 
   // suffix operators
 
-  lazy val suffixIsNull: PackratParser[Expression] =
-    composite <~ "." ~ IS_NULL ~ opt("()") ^^ { e => IsNull(e) }
-
-  lazy val suffixIsNotNull: PackratParser[Expression] =
-    composite <~ "." ~ IS_NOT_NULL ~ opt("()") ^^ { e => IsNotNull(e) }
-
   lazy val suffixSum: PackratParser[Expression] =
     composite <~ "." ~ SUM ~ opt("()") ^^ { e => Sum(e) }
 
@@ -230,6 +222,10 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
     case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args)
   }
 
+  lazy val suffixFunctionCallOneArg = composite ~ "." ~ functionIdent ^^ {
+    case operand ~ _ ~ name => Call(name.toUpperCase, Seq(operand))
+  }
+
   lazy val suffixAsc : PackratParser[Expression] =
     atom <~ "." ~ ASC ~ opt("()") ^^ { e => Asc(e) }
 
@@ -264,20 +260,14 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   }
 
   lazy val suffixed: PackratParser[Expression] =
-    suffixTimeInterval | suffixIsNull | suffixIsNotNull | suffixSum | suffixMin | suffixMax |
+    suffixTimeInterval | suffixSum | suffixMin | suffixMax |
       suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs |
       suffixIf | suffixAsc | suffixDesc | suffixToDate | suffixToTimestamp | suffixToTime |
       suffixExtract | suffixFloor | suffixCeil |
-      suffixFunctionCall // function call must always be at the end
+      suffixFunctionCall | suffixFunctionCallOneArg // function call must always be at the end
 
   // prefix operators
 
-  lazy val prefixIsNull: PackratParser[Expression] =
-    IS_NULL ~ "(" ~> expression <~ ")" ^^ { e => IsNull(e) }
-
-  lazy val prefixIsNotNull: PackratParser[Expression] =
-    IS_NOT_NULL ~ "(" ~> expression <~ ")" ^^ { e => IsNotNull(e) }
-
   lazy val prefixSum: PackratParser[Expression] =
     SUM ~ "(" ~> expression <~ ")" ^^ { e => Sum(e) }
 
@@ -312,6 +302,10 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
     case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args)
   }
 
+  lazy val prefixFunctionCallOneArg = functionIdent ~ "(" ~ expression ~ ")" ^^ {
+    case name ~ _ ~ arg ~ _ => Call(name.toUpperCase, Seq(arg))
+  }
+
   lazy val prefixTrim = TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
     case _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ ~ operand ~ _ => Trim(mode, trimCharacter, operand)
   }
@@ -333,9 +327,10 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   }
 
   lazy val prefixed: PackratParser[Expression] =
-    prefixIsNull | prefixIsNotNull | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg |
+    prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg |
       prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixExtract |
-      prefixFloor | prefixCeil | prefixFunctionCall // function call must always be at the end
+      prefixFloor | prefixCeil |
+      prefixFunctionCall | prefixFunctionCallOneArg // function call must always be at the end
 
   // suffix/prefix composite
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
index 0acfbf1..ad01674 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
@@ -121,3 +121,23 @@ case class IsNotNull(child: Expression) extends UnaryExpression {
 
   override private[flink] def resultType = BOOLEAN_TYPE_INFO
 }
+
+case class IsTrue(child: Expression) extends UnaryExpression {
+  override def toString = s"($child).isTrue"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.IS_TRUE, child.toRexNode)
+  }
+
+  override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsFalse(child: Expression) extends UnaryExpression {
+  override def toString = s"($child).isFalse"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.IS_FALSE, child.toRexNode)
+  }
+
+  override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
index fb38dde..b9a3f71 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
@@ -117,6 +117,12 @@ class FunctionCatalog {
 object FunctionCatalog {
 
   val buildInFunctions: Map[String, Class[_]] = Map(
+    // logic
+    "isNull" -> classOf[IsNull],
+    "isNotNull" -> classOf[IsNotNull],
+    "isTrue" -> classOf[IsTrue],
+    "isFalse" -> classOf[IsFalse],
+
     // aggregate functions
     "avg" -> classOf[Avg],
     "count" -> classOf[Count],

http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
index 7162a04..7ab0c7d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
@@ -702,10 +702,61 @@ class ScalarFunctionsTest extends ExpressionTestBase {
       "1996-11-01")
   }
 
+  @Test
+  def testIsTrueIsFalse(): Unit = {
+    testAllApis(
+      'f1.isTrue,
+      "f1.isTrue",
+      "f1 IS TRUE",
+      "true")
+
+    testAllApis(
+      'f21.isTrue,
+      "f21.isTrue",
+      "f21 IS TRUE",
+      "false")
+
+    testAllApis(
+      false.isFalse,
+      "false.isFalse",
+      "FALSE IS FALSE",
+      "true")
+
+    testAllApis(
+      'f21.isFalse,
+      "f21.isFalse",
+      "f21 IS FALSE",
+      "false")
+
+    testAllApis(
+      !'f1.isTrue,
+      "!f1.isTrue",
+      "f1 IS NOT TRUE",
+      "false")
+
+    testAllApis(
+      !'f21.isTrue,
+      "!f21.isTrue",
+      "f21 IS NOT TRUE",
+      "true")
+
+    testAllApis(
+      !false.isFalse,
+      "!false.isFalse",
+      "FALSE IS NOT FALSE",
+      "false")
+
+    testAllApis(
+      !'f21.isFalse,
+      "!f21.isFalse",
+      "f21 IS NOT FALSE",
+      "true")
+  }
+
   // ----------------------------------------------------------------------------------------------
 
   def testData = {
-    val testData = new Row(21)
+    val testData = new Row(22)
     testData.setField(0, "This is a test String.")
     testData.setField(1, true)
     testData.setField(2, 42.toByte)
@@ -727,6 +778,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
     testData.setField(18, Timestamp.valueOf("1996-11-10 06:55:44.333"))
     testData.setField(19, 1467012213000L) // +16979 07:23:33.000
     testData.setField(20, 25) // +2-01
+    testData.setField(21, null)
     testData
   }
 
@@ -752,6 +804,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
       Types.TIME,
       Types.TIMESTAMP,
       Types.INTERVAL_MILLIS,
-      Types.INTERVAL_MONTHS)).asInstanceOf[TypeInformation[Any]]
+      Types.INTERVAL_MONTHS,
+      Types.BOOLEAN)).asInstanceOf[TypeInformation[Any]]
   }
 }