You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/03/02 15:17:49 UTC

flink git commit: [FLINK-5524] [table] Support early out for code generated AND/OR conditions

Repository: flink
Updated Branches:
  refs/heads/master 82047f723 -> f6c9b32c1


[FLINK-5524] [table] Support early out for code generated AND/OR conditions

This closes #3372.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6c9b32c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6c9b32c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6c9b32c

Branch: refs/heads/master
Commit: f6c9b32c1f1fd7521e2dbe16ab20985a45d7cb07
Parents: 82047f7
Author: Kurt Young <yk...@gmail.com>
Authored: Tue Feb 21 14:35:17 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Thu Mar 2 16:13:43 2017 +0100

----------------------------------------------------------------------
 .../table/codegen/calls/ScalarOperators.scala   | 152 +++++++++++--------
 .../table/expressions/ScalarOperatorsTest.scala |  50 +++++-
 .../utils/UserDefinedScalarFunctions.scala      |   6 +
 3 files changed, 134 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f6c9b32c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 3f7c91f..47a81ab 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -286,42 +286,51 @@ object ScalarOperators {
       // Unknown && False -> False
       // Unknown && Unknown -> Unknown
       s"""
-        |${left.code}
-        |${right.code}
-        |boolean $resultTerm;
-        |boolean $nullTerm;
-        |if (!${left.nullTerm} && !${right.nullTerm}) {
-        |  $resultTerm = ${left.resultTerm} && ${right.resultTerm};
-        |  $nullTerm = false;
-        |}
-        |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) {
-        |  $resultTerm = false;
-        |  $nullTerm = true;
-        |}
-        |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) {
-        |  $resultTerm = false;
-        |  $nullTerm = false;
-        |}
-        |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) {
-        |  $resultTerm = false;
-        |  $nullTerm = true;
-        |}
-        |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) {
-        |  $resultTerm = false;
-        |  $nullTerm = false;
-        |}
-        |else {
-        |  $resultTerm = false;
-        |  $nullTerm = true;
-        |}
-        |""".stripMargin
+         |${left.code}
+         |
+         |boolean $resultTerm = false;
+         |boolean $nullTerm = false;
+         |if (!${left.nullTerm} && !${left.resultTerm}) {
+         |  // left expr is false, skip right expr
+         |} else {
+         |  ${right.code}
+         |
+         |  if (!${left.nullTerm} && !${right.nullTerm}) {
+         |    $resultTerm = ${left.resultTerm} && ${right.resultTerm};
+         |    $nullTerm = false;
+         |  }
+         |  else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) {
+         |    $resultTerm = false;
+         |    $nullTerm = true;
+         |  }
+         |  else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) {
+         |    $resultTerm = false;
+         |    $nullTerm = false;
+         |  }
+         |  else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) {
+         |    $resultTerm = false;
+         |    $nullTerm = true;
+         |  }
+         |  else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) {
+         |    $resultTerm = false;
+         |    $nullTerm = false;
+         |  }
+         |  else {
+         |    $resultTerm = false;
+         |    $nullTerm = true;
+         |  }
+         |}
+       """.stripMargin
     }
     else {
       s"""
-        |${left.code}
-        |${right.code}
-        |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm};
-        |""".stripMargin
+         |${left.code}
+         |boolean $resultTerm = false;
+         |if (${left.resultTerm}) {
+         |  ${right.code}
+         |  $resultTerm = ${right.resultTerm};
+         |}
+         |""".stripMargin
     }
 
     GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
@@ -338,47 +347,56 @@ object ScalarOperators {
     val operatorCode = if (nullCheck) {
       // Three-valued logic:
       // no Unknown -> Two-valued logic
-      // True && Unknown -> True
-      // False && Unknown -> Unknown
-      // Unknown && True -> True
-      // Unknown && False -> Unknown
-      // Unknown && Unknown -> Unknown
+      // True || Unknown -> True
+      // False || Unknown -> Unknown
+      // Unknown || True -> True
+      // Unknown || False -> Unknown
+      // Unknown || Unknown -> Unknown
       s"""
         |${left.code}
-        |${right.code}
-        |boolean $resultTerm;
-        |boolean $nullTerm;
-        |if (!${left.nullTerm} && !${right.nullTerm}) {
-        |  $resultTerm = ${left.resultTerm} || ${right.resultTerm};
-        |  $nullTerm = false;
-        |}
-        |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) {
-        |  $resultTerm = true;
-        |  $nullTerm = false;
-        |}
-        |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) {
-        |  $resultTerm = false;
-        |  $nullTerm = true;
-        |}
-        |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) {
-        |  $resultTerm = true;
-        |  $nullTerm = false;
-        |}
-        |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) {
-        |  $resultTerm = false;
-        |  $nullTerm = true;
-        |}
-        |else {
-        |  $resultTerm = false;
-        |  $nullTerm = true;
+        |
+        |boolean $resultTerm = true;
+        |boolean $nullTerm = false;
+        |if (!${left.nullTerm} && ${left.resultTerm}) {
+        |  // left expr is true, skip right expr
+        |} else {
+        |  ${right.code}
+        |
+        |  if (!${left.nullTerm} && !${right.nullTerm}) {
+        |    $resultTerm = ${left.resultTerm} || ${right.resultTerm};
+        |    $nullTerm = false;
+        |  }
+        |  else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) {
+        |    $resultTerm = true;
+        |    $nullTerm = false;
+        |  }
+        |  else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) {
+        |    $resultTerm = false;
+        |    $nullTerm = true;
+        |  }
+        |  else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) {
+        |    $resultTerm = true;
+        |    $nullTerm = false;
+        |  }
+        |  else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) {
+        |    $resultTerm = false;
+        |    $nullTerm = true;
+        |  }
+        |  else {
+        |    $resultTerm = false;
+        |    $nullTerm = true;
+        |  }
         |}
         |""".stripMargin
     }
     else {
       s"""
-        |${left.code}
-        |${right.code}
-        |boolean $resultTerm = ${left.resultTerm} || ${right.resultTerm};
+         |${left.code}
+         |boolean $resultTerm = true;
+         |if (!${left.resultTerm}) {
+         |  ${right.code}
+         |  $resultTerm = ${right.resultTerm};
+         |}
         |""".stripMargin
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f6c9b32c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
index ea8ac8a..a4dca93 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
@@ -23,7 +23,8 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.types.Row
 import org.apache.flink.table.api.{Types, ValidationException}
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.utils.ExpressionTestBase
+import org.apache.flink.table.expressions.utils.{ExpressionTestBase, ShouldNotExecuteFunc}
+import org.apache.flink.table.functions.ScalarFunction
 import org.junit.Test
 
 class ScalarOperatorsTest extends ExpressionTestBase {
@@ -89,10 +90,37 @@ class ScalarOperatorsTest extends ExpressionTestBase {
     testTableApi( +'f8, "+f8", "5") // additional space before "+" required because of checkstyle
     testTableApi(3.toExpr + 'f8, "3 + f8", "8")
 
-    // boolean arithmetic
-    testTableApi('f6 && true, "f6 && true", "true")
-    testTableApi('f6 && false, "f6 && false", "false")
-    testTableApi('f6 || false, "f6 || false", "true")
+    // boolean arithmetic: AND
+    testTableApi('f6 && true, "f6 && true", "true")      // true && true
+    testTableApi('f6 && false, "f6 && false", "false")   // true && false
+    testTableApi('f11 && true, "f11 && true", "false")   // false && true
+    testTableApi('f11 && false, "f11 && false", "false") // false && false
+    testTableApi('f6 && 'f12, "f6 && f12", "null")       // true && null
+    testTableApi('f11 && 'f12, "f11 && f12", "false")    // false && null
+    testTableApi('f12 && true, "f12 && true", "null")    // null && true
+    testTableApi('f12 && false, "f12 && false", "false") // null && false
+    testTableApi('f12 && 'f12, "f12 && f12", "null")     // null && null
+    testTableApi('f11 && ShouldNotExecuteFunc('f10),     // early out
+      "f11 && ShouldNotExecuteFunc(f10)", "false")
+    testTableApi('f6 && 'f11 && ShouldNotExecuteFunc('f10),  // early out
+      "f6 && f11 && ShouldNotExecuteFunc(f10)", "false")
+
+    // boolean arithmetic: OR
+    testTableApi('f6 || true, "f6 || true", "true")      // true || true
+    testTableApi('f6 || false, "f6 || false", "true")    // true || false
+    testTableApi('f11 || true, "f11 || true", "true")    // false || true
+    testTableApi('f11 || false, "f11 || false", "false") // false || false
+    testTableApi('f6 || 'f12, "f6 || f12", "true")       // true || null
+    testTableApi('f11 || 'f12, "f11 || f12", "null")     // false || null
+    testTableApi('f12 || true, "f12 || true", "true")    // null || true
+    testTableApi('f12 || false, "f12 || false", "null")  // null || false
+    testTableApi('f12 || 'f12, "f12 || f12", "null")     // null || null
+    testTableApi('f6 || ShouldNotExecuteFunc('f10),      // early out
+      "f6 || ShouldNotExecuteFunc(f10)", "true")
+    testTableApi('f11 || 'f6 || ShouldNotExecuteFunc('f10),  // early out
+      "f11 || f6 || ShouldNotExecuteFunc(f10)", "true")
+
+    // boolean arithmetic: NOT
     testTableApi(!'f6, "!f6", "false")
 
     // comparison
@@ -187,7 +215,7 @@ class ScalarOperatorsTest extends ExpressionTestBase {
   // ----------------------------------------------------------------------------------------------
 
   def testData = {
-    val testData = new Row(11)
+    val testData = new Row(13)
     testData.setField(0, 1: Byte)
     testData.setField(1, 1: Short)
     testData.setField(2, 1)
@@ -199,6 +227,8 @@ class ScalarOperatorsTest extends ExpressionTestBase {
     testData.setField(8, 5)
     testData.setField(9, 10)
     testData.setField(10, "String")
+    testData.setField(11, false)
+    testData.setField(12, null)
     testData
   }
 
@@ -214,8 +244,14 @@ class ScalarOperatorsTest extends ExpressionTestBase {
       Types.DOUBLE,
       Types.INT,
       Types.INT,
-      Types.STRING
+      Types.STRING,
+      Types.BOOLEAN,
+      Types.BOOLEAN
       ).asInstanceOf[TypeInformation[Any]]
   }
 
+  override def functions: Map[String, ScalarFunction] = Map(
+    "shouldNotExecuteFunc" -> ShouldNotExecuteFunc
+  )
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f6c9b32c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala
index 4fee3b2..1258137 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala
@@ -124,6 +124,12 @@ object Func12 extends ScalarFunction {
   }
 }
 
+object ShouldNotExecuteFunc extends ScalarFunction {
+  def eval(s: String): Boolean = {
+    throw new Exception("This func should never be executed")
+  }
+}
+
 class RichFunc0 extends ScalarFunction {
   var openCalled = false
   var closeCalled = false