You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/04/25 14:40:12 UTC
flink git commit: [FLINK-9229] [table] Fix literal handling in code
generation
Repository: flink
Updated Branches:
refs/heads/master c50b573e9 -> 063aeb17d
[FLINK-9229] [table] Fix literal handling in code generation
This closes #5898.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/063aeb17
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/063aeb17
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/063aeb17
Branch: refs/heads/master
Commit: 063aeb17d273554949632338d788597617adcfb5
Parents: c50b573
Author: Timo Walther <tw...@apache.org>
Authored: Mon Apr 23 13:28:21 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Wed Apr 25 16:33:07 2018 +0200
----------------------------------------------------------------------
.../flink/table/codegen/CodeGenerator.scala | 54 +++++++++++---------
.../codegen/calls/CurrentTimePointCallGen.scala | 10 ++--
.../codegen/calls/ScalarFunctionCallGen.scala | 2 +-
.../table/runtime/batch/table/CalcITCase.scala | 5 +-
4 files changed, 39 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/063aeb17/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 44885e3..f0b6793 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -36,7 +36,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.table.api.{TableConfig, TableException}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenUtils._
-import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
+import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NEVER_NULL, NO_CODE}
import org.apache.flink.table.codegen.calls.ScalarOperators._
import org.apache.flink.table.codegen.calls.{CurrentTimePointCallGen, FunctionGenerator}
import org.apache.flink.table.functions.sql.{ProctimeSqlFunction, ScalarSqlFunctions, StreamRecordTimestampSqlFunction}
@@ -1209,7 +1209,7 @@ abstract class CodeGenerator(
case ObjectFieldAccessor(field) =>
// primitive
if (isFieldPrimitive(field)) {
- generateNonNullLiteral(fieldType, s"$inputTerm.${field.getName}")
+ generateTerm(fieldType, s"$inputTerm.${field.getName}")
}
// Object
else {
@@ -1238,7 +1238,7 @@ abstract class CodeGenerator(
val reflectiveAccessCode = reflectiveFieldReadAccess(fieldTerm, field, inputTerm)
// primitive
if (isFieldPrimitive(field)) {
- generateNonNullLiteral(fieldType, reflectiveAccessCode)
+ generateTerm(fieldType, reflectiveAccessCode)
}
// Object
else {
@@ -1255,16 +1255,16 @@ abstract class CodeGenerator(
private def generateNullLiteral(resultType: TypeInformation[_]): GeneratedExpression = {
val resultTerm = newName("result")
- val nullTerm = newName("isNull")
val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
val defaultValue = primitiveDefaultValue(resultType)
if (nullCheck) {
val wrappedCode = s"""
|$resultTypeTerm $resultTerm = $defaultValue;
- |boolean $nullTerm = true;
|""".stripMargin
- GeneratedExpression(resultTerm, nullTerm, wrappedCode, resultType, literal = true)
+
+ // mark this expression as a constant literal
+ GeneratedExpression(resultTerm, ALWAYS_NULL, wrappedCode, resultType, literal = true)
} else {
throw new CodeGenException("Null literals are not allowed if nullCheck is disabled.")
}
@@ -1274,33 +1274,41 @@ abstract class CodeGenerator(
literalType: TypeInformation[_],
literalCode: String)
: GeneratedExpression = {
- val resultTerm = newName("result")
- val nullTerm = newName("isNull")
- val resultTypeTerm = primitiveTypeTermForTypeInfo(literalType)
-
- val resultCode = if (nullCheck) {
- s"""
- |$resultTypeTerm $resultTerm = $literalCode;
- |boolean $nullTerm = false;
- |""".stripMargin
- } else {
- s"""
- |$resultTypeTerm $resultTerm = $literalCode;
- |""".stripMargin
- }
- GeneratedExpression(resultTerm, nullTerm, resultCode, literalType, literal = true)
+ // mark this expression as a constant literal
+ generateTerm(literalType, literalCode).copy(literal = true)
}
private[flink] def generateSymbol(enum: Enum[_]): GeneratedExpression = {
GeneratedExpression(
qualifyEnum(enum),
- "false",
- "",
+ NEVER_NULL,
+ NO_CODE,
new GenericTypeInfo(enum.getDeclaringClass))
}
/**
+ * Generates access to a term (e.g. a field) that does not require unboxing logic.
+ *
+ * @param fieldType type of field
+ * @param fieldTerm expression term of field (already unboxed)
+ * @return internal unboxed field representation
+ */
+ private[flink] def generateTerm(
+ fieldType: TypeInformation[_],
+ fieldTerm: String)
+ : GeneratedExpression = {
+ val resultTerm = newName("result")
+ val resultTypeTerm = primitiveTypeTermForTypeInfo(fieldType)
+
+ val resultCode = s"""
+ |$resultTypeTerm $resultTerm = $fieldTerm;
+ |""".stripMargin
+
+ GeneratedExpression(resultTerm, NEVER_NULL, resultCode, fieldType)
+ }
+
+ /**
* Converts the external boxed format to an internal mostly primitive field representation.
* Wrapper types can autoboxed to their corresponding primitive type (Integer -> int). External
* objects are converted to their internal representation (Timestamp -> internal timestamp
http://git-wip-us.apache.org/repos/asf/flink/blob/063aeb17/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala
index d644847..3462368 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala
@@ -36,23 +36,23 @@ class CurrentTimePointCallGen(
: GeneratedExpression = targetType match {
case SqlTimeTypeInfo.TIME if local =>
val time = codeGenerator.addReusableLocalTime()
- codeGenerator.generateNonNullLiteral(targetType, time)
+ codeGenerator.generateTerm(targetType, time)
case SqlTimeTypeInfo.TIMESTAMP if local =>
val timestamp = codeGenerator.addReusableLocalTimestamp()
- codeGenerator.generateNonNullLiteral(targetType, timestamp)
+ codeGenerator.generateTerm(targetType, timestamp)
case SqlTimeTypeInfo.DATE =>
val date = codeGenerator.addReusableDate()
- codeGenerator.generateNonNullLiteral(targetType, date)
+ codeGenerator.generateTerm(targetType, date)
case SqlTimeTypeInfo.TIME =>
val time = codeGenerator.addReusableTime()
- codeGenerator.generateNonNullLiteral(targetType, time)
+ codeGenerator.generateTerm(targetType, time)
case SqlTimeTypeInfo.TIMESTAMP =>
val timestamp = codeGenerator.addReusableTimestamp()
- codeGenerator.generateNonNullLiteral(targetType, timestamp)
+ codeGenerator.generateTerm(targetType, timestamp)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/063aeb17/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
index 6fad573..df206de 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
@@ -107,7 +107,7 @@ class ScalarFunctionCallGen(
// convert result of function to internal representation (input unboxing)
val resultUnboxing = if (resultClass.isPrimitive) {
- codeGenerator.generateNonNullLiteral(returnType, resultTerm)
+ codeGenerator.generateTerm(returnType, resultTerm)
} else {
codeGenerator.generateInputFieldUnboxing(returnType, resultTerm)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/063aeb17/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index aa37d1b..71a87d2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -30,19 +30,18 @@ import org.apache.flink.table.api.scala._
import org.apache.flink.table.expressions.Literal
import org.apache.flink.table.expressions.utils._
import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase, UserDefinedFunctionTestUtils}
import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase, UserDefinedFunctionTestUtils}
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.test.util.TestBaseUtils.compareResultAsText
import org.apache.flink.types.Row
-import org.junit._
import org.junit.Assert.assertEquals
+import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.math.BigDecimal.RoundingMode
@RunWith(classOf[Parameterized])
class CalcITCase(