You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/30 06:24:02 UTC

[flink] branch release-1.10 updated: [FLINK-16727][table-planner-blink] Fix cast exception when having time point literal as parameters

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new f619b77  [FLINK-16727][table-planner-blink] Fix cast exception when having time point literal as parameters
f619b77 is described below

commit f619b77a182cdfb9653caf12e37d32f9949cbdd7
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon Mar 30 14:17:44 2020 +0800

    [FLINK-16727][table-planner-blink] Fix cast exception when having time point literal as parameters
    
    This closes #11550
---
 .../codegen/calls/ScalarFunctionCallGen.scala      | 10 +-----
 .../flink/table/planner/expressions/call.scala     |  1 -
 .../functions/utils/ScalarSqlFunction.scala        | 14 +-------
 .../functions/utils/UserDefinedFunctionUtils.scala |  1 -
 .../UserDefinedScalarFunctionTest.scala            | 39 +++++++++++++++-------
 5 files changed, 29 insertions(+), 36 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala
index 6fd62df..108f168 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala
@@ -46,14 +46,6 @@ class ScalarFunctionCallGen(scalarFunction: ScalarFunction) extends CallGenerato
       operands: Seq[GeneratedExpression],
       returnType: LogicalType): GeneratedExpression = {
     val operandTypes = operands.map(_.resultType).toArray
-    val arguments = operands.map {
-      case expr if expr.literal =>
-        getConverterForDataType(fromLogicalTypeToDataType(expr.resultType))
-            .asInstanceOf[DataFormatConverters.DataFormatConverter[Any, Any]]
-            .toExternal(expr.literalValue.get)
-            .asInstanceOf[AnyRef]
-      case _ => null
-    }.toArray
     // determine function method and result class
     val resultClass = getResultTypeClassOfScalarFunction(scalarFunction, operandTypes)
 
@@ -70,7 +62,7 @@ class ScalarFunctionCallGen(scalarFunction: ScalarFunction) extends CallGenerato
     val resultTerm = ctx.addReusableLocalVariable(resultTypeTerm, "result")
     val evalResult = s"$functionReference.eval(${parameters.map(_.resultTerm).mkString(", ")})"
     val resultExternalType = UserDefinedFunctionUtils.getResultTypeOfScalarFunction(
-      scalarFunction, arguments, operandTypes)
+      scalarFunction, operandTypes)
     val setResult = {
       if (resultClass.isPrimitive && isInternalClass(resultExternalType)) {
         s"$resultTerm = $evalResult;"
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
index b1bf10a..0b13c03 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
@@ -164,7 +164,6 @@ case class PlannerScalarFunctionCall(
   override private[flink] def resultType =
     fromDataTypeToTypeInfo(getResultTypeOfScalarFunction(
       scalarFunction,
-      Array(),
       signature))
 
   override private[flink] def validateInput(): ValidationResult = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
index 0acf357..18b101a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
@@ -80,20 +80,8 @@ object ScalarSqlFunction {
       */
     new SqlReturnTypeInference {
       override def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = {
-        val sqlTypes = opBinding.collectOperandTypes().asScala.toArray
         val parameters = getOperandType(opBinding).toArray
-
-        val arguments = sqlTypes.indices.map(i =>
-          if (opBinding.isOperandNull(i, false)) {
-            null
-          } else if (opBinding.isOperandLiteral(i, false)) {
-            opBinding.getOperandLiteralValue(
-              i, getDefaultExternalClassForType(parameters(i))).asInstanceOf[AnyRef]
-          } else {
-            null
-          }
-        ).toArray
-        val resultType = getResultTypeOfScalarFunction(scalarFunction, arguments, parameters)
+        val resultType = getResultTypeOfScalarFunction(scalarFunction, parameters)
         typeFactory.createFieldTypeFromLogicalType(
           fromDataTypeToLogicalType(resultType))
       }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
index c5779ff..da902bb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
@@ -615,7 +615,6 @@ object UserDefinedFunctionUtils {
 
   def getResultTypeOfScalarFunction(
       function: ScalarFunction,
-      arguments: Array[AnyRef],
       argTypes: Array[LogicalType]): DataType = {
     val userDefinedTypeInfo = function.getResultType(getEvalMethodSignature(function, argTypes))
     if (userDefinedTypeInfo != null) {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/UserDefinedScalarFunctionTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/UserDefinedScalarFunctionTest.scala
index 0f65507..2e9e6ea 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/UserDefinedScalarFunctionTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/UserDefinedScalarFunctionTest.scala
@@ -25,26 +25,18 @@ import org.apache.flink.table.api.{DataTypes, Types, ValidationException}
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.planner.expressions.utils.{ExpressionTestBase, _}
 import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions._
+import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.{DateFunction, DateTimeFunction, LocalDateFunction, LocalTimeFunction, TimeFunction, TimestampFunction}
 import org.apache.flink.table.planner.utils.DateTimeTestUtil
 import org.apache.flink.types.Row
+
 import org.junit.Test
+
 import java.lang.{Boolean => JBoolean}
 import java.time.ZoneId
 
 class UserDefinedScalarFunctionTest extends ExpressionTestBase {
 
   @Test
-  def test(): Unit = {
-    val JavaFunc1 = new JavaFunc1()
-
-    testAllApis(
-      JavaFunc1(nullOf(DataTypes.TIME), 15, nullOf(DataTypes.TIMESTAMP(3))),
-      "JavaFunc1(Null(SQL_TIME), 15, Null(SQL_TIMESTAMP))",
-      "JavaFunc1(NULL, 15, NULL)",
-      "null and 15 and null")
-  }
-
-  @Test
   def testParameters(): Unit = {
     testAllApis(
       Func0('f0),
@@ -154,6 +146,13 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
       "Func0(Null(INT))",
       "Func0(NULL)",
       "-1")
+
+    val JavaFunc1 = new JavaFunc1()
+    testAllApis(
+      JavaFunc1(nullOf(DataTypes.TIME), 15, nullOf(DataTypes.TIMESTAMP(3))),
+      "JavaFunc1(Null(SQL_TIME), 15, Null(SQL_TIMESTAMP))",
+      "JavaFunc1(NULL, 15, NULL)",
+      "null and 15 and null")
   }
 
   @Test
@@ -258,6 +257,16 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
   }
 
   @Test
+  def testLiteralTemporalParameters(): Unit = {
+    testSqlApi("DateFunction(DATE '2020-03-27')", "2020-03-27")
+    testSqlApi("LocalDateFunction(DATE '2020-03-27')", "2020-03-27")
+    testSqlApi("TimeFunction(TIME '18:30:55')", "18:30:55")
+    testSqlApi("LocalTimeFunction(TIME '18:30:55')", "18:30:55")
+    testSqlApi("DateTimeFunction(TIMESTAMP '2020-03-27 18:30:55')", "2020-03-27T18:30:55")
+    testSqlApi("TimestampFunction(TIMESTAMP '2020-03-27 18:30:55')", "2020-03-27 18:30:55.0")
+  }
+
+  @Test
   def testTimePointsOnPrimitivesInShanghai(): Unit = {
     config.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
 
@@ -565,7 +574,13 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
     "JavaFunc4" -> new JavaFunc4,
     "RichFunc0" -> new RichFunc0,
     "RichFunc1" -> new RichFunc1,
-    "RichFunc2" -> new RichFunc2
+    "RichFunc2" -> new RichFunc2,
+    "DateFunction" -> DateFunction,
+    "LocalDateFunction" -> LocalDateFunction,
+    "TimeFunction" -> TimeFunction,
+    "LocalTimeFunction" -> LocalTimeFunction,
+    "DateTimeFunction" -> DateTimeFunction,
+    "TimestampFunction" -> TimestampFunction
   )
 }