You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/30 06:24:02 UTC
[flink] branch release-1.10 updated:
[FLINK-16727][table-planner-blink] Fix cast exception when having time
point literal as parameters
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new f619b77 [FLINK-16727][table-planner-blink] Fix cast exception when having time point literal as parameters
f619b77 is described below
commit f619b77a182cdfb9653caf12e37d32f9949cbdd7
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon Mar 30 14:17:44 2020 +0800
[FLINK-16727][table-planner-blink] Fix cast exception when having time point literal as parameters
This closes #11550
---
.../codegen/calls/ScalarFunctionCallGen.scala | 10 +-----
.../flink/table/planner/expressions/call.scala | 1 -
.../functions/utils/ScalarSqlFunction.scala | 14 +-------
.../functions/utils/UserDefinedFunctionUtils.scala | 1 -
.../UserDefinedScalarFunctionTest.scala | 39 +++++++++++++++-------
5 files changed, 29 insertions(+), 36 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala
index 6fd62df..108f168 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala
@@ -46,14 +46,6 @@ class ScalarFunctionCallGen(scalarFunction: ScalarFunction) extends CallGenerato
operands: Seq[GeneratedExpression],
returnType: LogicalType): GeneratedExpression = {
val operandTypes = operands.map(_.resultType).toArray
- val arguments = operands.map {
- case expr if expr.literal =>
- getConverterForDataType(fromLogicalTypeToDataType(expr.resultType))
- .asInstanceOf[DataFormatConverters.DataFormatConverter[Any, Any]]
- .toExternal(expr.literalValue.get)
- .asInstanceOf[AnyRef]
- case _ => null
- }.toArray
// determine function method and result class
val resultClass = getResultTypeClassOfScalarFunction(scalarFunction, operandTypes)
@@ -70,7 +62,7 @@ class ScalarFunctionCallGen(scalarFunction: ScalarFunction) extends CallGenerato
val resultTerm = ctx.addReusableLocalVariable(resultTypeTerm, "result")
val evalResult = s"$functionReference.eval(${parameters.map(_.resultTerm).mkString(", ")})"
val resultExternalType = UserDefinedFunctionUtils.getResultTypeOfScalarFunction(
- scalarFunction, arguments, operandTypes)
+ scalarFunction, operandTypes)
val setResult = {
if (resultClass.isPrimitive && isInternalClass(resultExternalType)) {
s"$resultTerm = $evalResult;"
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
index b1bf10a..0b13c03 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
@@ -164,7 +164,6 @@ case class PlannerScalarFunctionCall(
override private[flink] def resultType =
fromDataTypeToTypeInfo(getResultTypeOfScalarFunction(
scalarFunction,
- Array(),
signature))
override private[flink] def validateInput(): ValidationResult = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
index 0acf357..18b101a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
@@ -80,20 +80,8 @@ object ScalarSqlFunction {
*/
new SqlReturnTypeInference {
override def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = {
- val sqlTypes = opBinding.collectOperandTypes().asScala.toArray
val parameters = getOperandType(opBinding).toArray
-
- val arguments = sqlTypes.indices.map(i =>
- if (opBinding.isOperandNull(i, false)) {
- null
- } else if (opBinding.isOperandLiteral(i, false)) {
- opBinding.getOperandLiteralValue(
- i, getDefaultExternalClassForType(parameters(i))).asInstanceOf[AnyRef]
- } else {
- null
- }
- ).toArray
- val resultType = getResultTypeOfScalarFunction(scalarFunction, arguments, parameters)
+ val resultType = getResultTypeOfScalarFunction(scalarFunction, parameters)
typeFactory.createFieldTypeFromLogicalType(
fromDataTypeToLogicalType(resultType))
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
index c5779ff..da902bb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
@@ -615,7 +615,6 @@ object UserDefinedFunctionUtils {
def getResultTypeOfScalarFunction(
function: ScalarFunction,
- arguments: Array[AnyRef],
argTypes: Array[LogicalType]): DataType = {
val userDefinedTypeInfo = function.getResultType(getEvalMethodSignature(function, argTypes))
if (userDefinedTypeInfo != null) {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/UserDefinedScalarFunctionTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/UserDefinedScalarFunctionTest.scala
index 0f65507..2e9e6ea 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/UserDefinedScalarFunctionTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/UserDefinedScalarFunctionTest.scala
@@ -25,26 +25,18 @@ import org.apache.flink.table.api.{DataTypes, Types, ValidationException}
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.expressions.utils.{ExpressionTestBase, _}
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions._
+import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.{DateFunction, DateTimeFunction, LocalDateFunction, LocalTimeFunction, TimeFunction, TimestampFunction}
import org.apache.flink.table.planner.utils.DateTimeTestUtil
import org.apache.flink.types.Row
+
import org.junit.Test
+
import java.lang.{Boolean => JBoolean}
import java.time.ZoneId
class UserDefinedScalarFunctionTest extends ExpressionTestBase {
@Test
- def test(): Unit = {
- val JavaFunc1 = new JavaFunc1()
-
- testAllApis(
- JavaFunc1(nullOf(DataTypes.TIME), 15, nullOf(DataTypes.TIMESTAMP(3))),
- "JavaFunc1(Null(SQL_TIME), 15, Null(SQL_TIMESTAMP))",
- "JavaFunc1(NULL, 15, NULL)",
- "null and 15 and null")
- }
-
- @Test
def testParameters(): Unit = {
testAllApis(
Func0('f0),
@@ -154,6 +146,13 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
"Func0(Null(INT))",
"Func0(NULL)",
"-1")
+
+ val JavaFunc1 = new JavaFunc1()
+ testAllApis(
+ JavaFunc1(nullOf(DataTypes.TIME), 15, nullOf(DataTypes.TIMESTAMP(3))),
+ "JavaFunc1(Null(SQL_TIME), 15, Null(SQL_TIMESTAMP))",
+ "JavaFunc1(NULL, 15, NULL)",
+ "null and 15 and null")
}
@Test
@@ -258,6 +257,16 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
}
@Test
+ def testLiteralTemporalParameters(): Unit = {
+ testSqlApi("DateFunction(DATE '2020-03-27')", "2020-03-27")
+ testSqlApi("LocalDateFunction(DATE '2020-03-27')", "2020-03-27")
+ testSqlApi("TimeFunction(TIME '18:30:55')", "18:30:55")
+ testSqlApi("LocalTimeFunction(TIME '18:30:55')", "18:30:55")
+ testSqlApi("DateTimeFunction(TIMESTAMP '2020-03-27 18:30:55')", "2020-03-27T18:30:55")
+ testSqlApi("TimestampFunction(TIMESTAMP '2020-03-27 18:30:55')", "2020-03-27 18:30:55.0")
+ }
+
+ @Test
def testTimePointsOnPrimitivesInShanghai(): Unit = {
config.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
@@ -565,7 +574,13 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
"JavaFunc4" -> new JavaFunc4,
"RichFunc0" -> new RichFunc0,
"RichFunc1" -> new RichFunc1,
- "RichFunc2" -> new RichFunc2
+ "RichFunc2" -> new RichFunc2,
+ "DateFunction" -> DateFunction,
+ "LocalDateFunction" -> LocalDateFunction,
+ "TimeFunction" -> TimeFunction,
+ "LocalTimeFunction" -> LocalTimeFunction,
+ "DateTimeFunction" -> DateTimeFunction,
+ "TimestampFunction" -> TimestampFunction
)
}