You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/04/25 14:40:12 UTC

flink git commit: [FLINK-9229] [table] Fix literal handling in code generation

Repository: flink
Updated Branches:
  refs/heads/master c50b573e9 -> 063aeb17d


[FLINK-9229] [table] Fix literal handling in code generation

This closes #5898.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/063aeb17
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/063aeb17
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/063aeb17

Branch: refs/heads/master
Commit: 063aeb17d273554949632338d788597617adcfb5
Parents: c50b573
Author: Timo Walther <tw...@apache.org>
Authored: Mon Apr 23 13:28:21 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Wed Apr 25 16:33:07 2018 +0200

----------------------------------------------------------------------
 .../flink/table/codegen/CodeGenerator.scala     | 54 +++++++++++---------
 .../codegen/calls/CurrentTimePointCallGen.scala | 10 ++--
 .../codegen/calls/ScalarFunctionCallGen.scala   |  2 +-
 .../table/runtime/batch/table/CalcITCase.scala  |  5 +-
 4 files changed, 39 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/063aeb17/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 44885e3..f0b6793 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -36,7 +36,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenUtils._
-import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
+import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NEVER_NULL, NO_CODE}
 import org.apache.flink.table.codegen.calls.ScalarOperators._
 import org.apache.flink.table.codegen.calls.{CurrentTimePointCallGen, FunctionGenerator}
 import org.apache.flink.table.functions.sql.{ProctimeSqlFunction, ScalarSqlFunctions, StreamRecordTimestampSqlFunction}
@@ -1209,7 +1209,7 @@ abstract class CodeGenerator(
           case ObjectFieldAccessor(field) =>
             // primitive
             if (isFieldPrimitive(field)) {
-              generateNonNullLiteral(fieldType, s"$inputTerm.${field.getName}")
+              generateTerm(fieldType, s"$inputTerm.${field.getName}")
             }
             // Object
             else {
@@ -1238,7 +1238,7 @@ abstract class CodeGenerator(
             val reflectiveAccessCode = reflectiveFieldReadAccess(fieldTerm, field, inputTerm)
             // primitive
             if (isFieldPrimitive(field)) {
-              generateNonNullLiteral(fieldType, reflectiveAccessCode)
+              generateTerm(fieldType, reflectiveAccessCode)
             }
             // Object
             else {
@@ -1255,16 +1255,16 @@ abstract class CodeGenerator(
 
   private def generateNullLiteral(resultType: TypeInformation[_]): GeneratedExpression = {
     val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
     val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
     val defaultValue = primitiveDefaultValue(resultType)
 
     if (nullCheck) {
       val wrappedCode = s"""
         |$resultTypeTerm $resultTerm = $defaultValue;
-        |boolean $nullTerm = true;
         |""".stripMargin
-      GeneratedExpression(resultTerm, nullTerm, wrappedCode, resultType, literal = true)
+
+      // mark this expression as a constant literal
+      GeneratedExpression(resultTerm, ALWAYS_NULL, wrappedCode, resultType, literal = true)
     } else {
       throw new CodeGenException("Null literals are not allowed if nullCheck is disabled.")
     }
@@ -1274,33 +1274,41 @@ abstract class CodeGenerator(
       literalType: TypeInformation[_],
       literalCode: String)
     : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(literalType)
-
-    val resultCode = if (nullCheck) {
-      s"""
-        |$resultTypeTerm $resultTerm = $literalCode;
-        |boolean $nullTerm = false;
-        |""".stripMargin
-    } else {
-      s"""
-        |$resultTypeTerm $resultTerm = $literalCode;
-        |""".stripMargin
-    }
 
-    GeneratedExpression(resultTerm, nullTerm, resultCode, literalType, literal = true)
+    // mark this expression as a constant literal
+    generateTerm(literalType, literalCode).copy(literal = true)
   }
 
   private[flink] def generateSymbol(enum: Enum[_]): GeneratedExpression = {
     GeneratedExpression(
       qualifyEnum(enum),
-      "false",
-      "",
+      NEVER_NULL,
+      NO_CODE,
       new GenericTypeInfo(enum.getDeclaringClass))
   }
 
   /**
+    * Generates access to a term (e.g. a field) that does not require unboxing logic.
+    *
+    * @param fieldType type of field
+    * @param fieldTerm expression term of field (already unboxed)
+    * @return internal unboxed field representation
+    */
+  private[flink] def generateTerm(
+      fieldType: TypeInformation[_],
+      fieldTerm: String)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val resultTypeTerm = primitiveTypeTermForTypeInfo(fieldType)
+
+    val resultCode = s"""
+        |$resultTypeTerm $resultTerm = $fieldTerm;
+        |""".stripMargin
+
+    GeneratedExpression(resultTerm, NEVER_NULL, resultCode, fieldType)
+  }
+
+  /**
     * Converts the external boxed format to an internal mostly primitive field representation.
     * Wrapper types can autoboxed to their corresponding primitive type (Integer -> int). External
     * objects are converted to their internal representation (Timestamp -> internal timestamp

http://git-wip-us.apache.org/repos/asf/flink/blob/063aeb17/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala
index d644847..3462368 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala
@@ -36,23 +36,23 @@ class CurrentTimePointCallGen(
     : GeneratedExpression = targetType match {
     case SqlTimeTypeInfo.TIME if local =>
       val time = codeGenerator.addReusableLocalTime()
-      codeGenerator.generateNonNullLiteral(targetType, time)
+      codeGenerator.generateTerm(targetType, time)
 
     case SqlTimeTypeInfo.TIMESTAMP if local =>
       val timestamp = codeGenerator.addReusableLocalTimestamp()
-      codeGenerator.generateNonNullLiteral(targetType, timestamp)
+      codeGenerator.generateTerm(targetType, timestamp)
 
     case SqlTimeTypeInfo.DATE =>
       val date = codeGenerator.addReusableDate()
-      codeGenerator.generateNonNullLiteral(targetType, date)
+      codeGenerator.generateTerm(targetType, date)
 
     case SqlTimeTypeInfo.TIME =>
       val time = codeGenerator.addReusableTime()
-      codeGenerator.generateNonNullLiteral(targetType, time)
+      codeGenerator.generateTerm(targetType, time)
 
     case SqlTimeTypeInfo.TIMESTAMP =>
       val timestamp = codeGenerator.addReusableTimestamp()
-      codeGenerator.generateNonNullLiteral(targetType, timestamp)
+      codeGenerator.generateTerm(targetType, timestamp)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/063aeb17/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
index 6fad573..df206de 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
@@ -107,7 +107,7 @@ class ScalarFunctionCallGen(
 
     // convert result of function to internal representation (input unboxing)
     val resultUnboxing = if (resultClass.isPrimitive) {
-      codeGenerator.generateNonNullLiteral(returnType, resultTerm)
+      codeGenerator.generateTerm(returnType, resultTerm)
     } else {
       codeGenerator.generateInputFieldUnboxing(returnType, resultTerm)
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/063aeb17/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index aa37d1b..71a87d2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -30,19 +30,18 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.Literal
 import org.apache.flink.table.expressions.utils._
 import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase, UserDefinedFunctionTestUtils}
 import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase, UserDefinedFunctionTestUtils}
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.test.util.TestBaseUtils.compareResultAsText
 import org.apache.flink.types.Row
-import org.junit._
 import org.junit.Assert.assertEquals
+import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.math.BigDecimal.RoundingMode
 
 @RunWith(classOf[Parameterized])
 class CalcITCase(