You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/01/29 14:09:41 UTC

[GitHub] [flink] twalthr opened a new pull request #10960: [FLINK-15487][table] Update code generation for new type inference

twalthr opened a new pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960
 
 
   ## What is the purpose of the change
   
   This updates the code generation for the new type inference and thus completes FLINK-15487. Scala function work with the types supported by the planner. Tests added in this PR only test basic behavior. We will need more tests per data type. But this is a follow up issue.
   
   
   ## Brief change log
   
   - Update the code generation
   - Fix a couple of bugs and shortcomings
   
   ## Verifying this change
   
   `FunctionITCase` tests the implementation.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? JavaDocs
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373505425
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -694,38 +706,67 @@ object CodeGenUtils {
     }
   }
 
+  /**
+   * Generates code for converting the given external source data type to the internal data format.
+   *
+   * Use this function for converting at the edges of the API.
+   */
   def genToInternalIfNeeded(
       ctx: CodeGeneratorContext,
-      t: DataType,
-      term: String): String = {
-    if (isInternalClass(t)) {
-      s"(${boxedTypeTermForType(fromDataTypeToLogicalType(t))}) $term"
+      sourceType: DataType,
+      externalTerm: String)
+    : GeneratedExpression = {
+    // convert external source type to internal format
+    val internalResultTerm = if (isInternalClass(sourceType)) {
+      s"(${boxedTypeTermForType(fromDataTypeToLogicalType(sourceType))}) $externalTerm"
 
 Review comment:
   It seems that the cast for Janino in the `BridgingSqlFunctionCallGen` also solves the cast of this method. So we can get rid of this cast here. Let's see if Travis gives green light.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373482601
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -694,38 +706,67 @@ object CodeGenUtils {
     }
   }
 
+  /**
+   * Generates code for converting the given external source data type to the internal data format.
+   *
+   * Use this function for converting at the edges of the API.
+   */
   def genToInternalIfNeeded(
       ctx: CodeGeneratorContext,
-      t: DataType,
-      term: String): String = {
-    if (isInternalClass(t)) {
-      s"(${boxedTypeTermForType(fromDataTypeToLogicalType(t))}) $term"
+      sourceType: DataType,
+      externalTerm: String)
+    : GeneratedExpression = {
+    // convert external source type to internal format
+    val internalResultTerm = if (isInternalClass(sourceType)) {
+      s"(${boxedTypeTermForType(fromDataTypeToLogicalType(sourceType))}) $externalTerm"
 
 Review comment:
   I added a comment. But actually the gap should be filled by the converters.  I think the core problem is that `int` and `Integer` are handled the same in the converters. Even though the latter can support null und needs unboxing.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] JingsongLi commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373754686
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -694,38 +702,70 @@ object CodeGenUtils {
     }
   }
 
+  /**
+   * Generates code for converting the given external source data type to the internal data format.
+   *
+   * Use this function for converting at the edges of the API.
+   */
   def genToInternalIfNeeded(
       ctx: CodeGeneratorContext,
-      t: DataType,
-      term: String): String = {
-    if (isInternalClass(t)) {
-      s"(${boxedTypeTermForType(fromDataTypeToLogicalType(t))}) $term"
+      sourceDataType: DataType,
+      externalTerm: String)
+    : GeneratedExpression = {
+    val sourceType = sourceDataType.getLogicalType
+    val sourceClass = sourceDataType.getConversionClass
+    // convert external source type to internal format
+    val internalResultTerm = if (isInternalClass(sourceDataType)) {
+      s"$externalTerm"
     } else {
-      genToInternal(ctx, t, term)
+      genToInternal(ctx, sourceDataType, externalTerm)
+    }
+    // extract null term from result term
+    if (sourceClass.isPrimitive) {
+      generateNonNullField(sourceType, internalResultTerm)
 
 Review comment:
   I have some concerns that the user has returned primitive conversion class, but the real java method return non-primitive which could be null.
   What kind of errors will we give users at this time? Maybe we can add a test?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373535234
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -172,11 +184,7 @@ object CodeGenUtils {
     * We only use TypeInformation to store external type info.
     */
   def boxedTypeTermForExternalType(t: DataType): String = {
-    if (t.getConversionClass == null) {
-      ClassLogicalTypeConverter.getDefaultExternalClassForType(t.getLogicalType).getCanonicalName
-    } else {
-      t.getConversionClass.getCanonicalName
-    }
+    t.getConversionClass.getCanonicalName
 
 Review comment:
   BTW, why is the method called `boxed...`? We don't perform any boxing here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#issuecomment-579780094
 
 
   <!--
   Meta data
   Hash:ef2e38562b3356eb4bf7c54d207367bb52e3e76e Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:ef2e38562b3356eb4bf7c54d207367bb52e3e76e
   -->
   ## CI report:
   
   * ef2e38562b3356eb4bf7c54d207367bb52e3e76e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r374675150
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -118,6 +119,17 @@ object CodeGenUtils {
     */
   def className[T](implicit m: Manifest[T]): String = m.runtimeClass.getCanonicalName
 
+  /**
+   * Returns a term for representing the given class in Java code.
+   */
+  def typeTerm(clazz: Class[_]): String = {
 
 Review comment:
   I will update it but in general if we want to relax the UDF constraints in the future, we might need to support anonymous classes in our utilities as well.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on issue #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr commented on issue #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#issuecomment-581970303
 
 
   @flinkbot run travis

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373490537
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingSqlFunctionCallGen.scala
 ##########
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen.calls
+import java.lang.reflect.Method
+import java.util.Collections
+
+import org.apache.calcite.rex.{RexCall, RexCallBinding}
+import org.apache.flink.table.functions.UserDefinedFunctionHelper.SCALAR_EVAL
+import org.apache.flink.table.functions.{ScalarFunction, TableFunction, UserDefinedFunction, UserDefinedFunctionHelper}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{genToExternalIfNeeded, genToInternalIfNeeded, typeTerm}
+import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGeneratorContext, GeneratedExpression}
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
+import org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
+import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.extraction.utils.ExtractionUtils
+import org.apache.flink.table.types.extraction.utils.ExtractionUtils.{createMethodSignatureString, isAssignable, isMethodInvokable, primitiveToWrapper}
+import org.apache.flink.table.types.inference.TypeInferenceUtil
+import org.apache.flink.table.types.logical.LogicalType
+
+/**
+ * Generates a call to a user-defined [[ScalarFunction]] or [[TableFunction]] (future work).
+ */
+class BridgingSqlFunctionCallGen(call: RexCall) extends CallGenerator {
+
+  private val function: BridgingSqlFunction = call.getOperator.asInstanceOf[BridgingSqlFunction]
+  private val udf: UserDefinedFunction = function.getDefinition.asInstanceOf[UserDefinedFunction]
+
+  override def generate(
+      ctx: CodeGeneratorContext,
+      operands: Seq[GeneratedExpression],
+      returnType: LogicalType)
+    : GeneratedExpression = {
+
+    val inference = function.getTypeInference
+
+    // we could have implemented a dedicated code generation context but the closer we are to
+    // Calcite the more consistent is the type inference during the data type enrichment
+    val callContext = new OperatorBindingCallContext(
+      function.getDataTypeFactory,
+      udf,
+      RexCallBinding.create(
+        function.getTypeFactory,
+        call,
+        Collections.emptyList()))
+
+    // enrich argument types with conversion class
+    val adaptedCallContext = TypeInferenceUtil.adaptArguments(
+      inference,
+      callContext,
+      null)
+    val enrichedArgumentDataTypes = toScala(adaptedCallContext.getArgumentDataTypes)
+    verifyArgumentTypes(operands.map(_.resultType), enrichedArgumentDataTypes)
+
+    // enrich output types with conversion class
+    val enrichedOutputDataType = TypeInferenceUtil.inferOutputType(
+      adaptedCallContext,
+      inference.getOutputTypeStrategy)
+    verifyOutputType(returnType, enrichedOutputDataType)
+
+    // find runtime method and generate call
+    verifyImplementation(enrichedArgumentDataTypes, enrichedOutputDataType)
+    generateFunctionCall(ctx, operands, enrichedArgumentDataTypes, enrichedOutputDataType)
+  }
+
+  private def generateFunctionCall(
+      ctx: CodeGeneratorContext,
+      operands: Seq[GeneratedExpression],
+      argumentDataTypes: Seq[DataType],
+      outputDataType: DataType)
+    : GeneratedExpression = {
+
+    val functionTerm = ctx.addReusableFunction(udf)
+
+    // operand conversion
+    val externalOperands = prepareExternalOperands(ctx, operands, argumentDataTypes)
+    val externalOperandTerms = externalOperands.map(_.resultTerm).mkString(", ")
+
+    // result conversion
+    val externalResultClass = outputDataType.getConversionClass
+    val externalResultTypeTerm = typeTerm(externalResultClass)
+    // Janino does not fully support the JVM spec:
+    // boolean b = (boolean) f(); where f returns Object
+    // This is not supported and we need to box manually.
+    val externalResultClassBoxed = primitiveToWrapper(externalResultClass)
+    val externalResultCasting = if (externalResultClass == externalResultClassBoxed) {
+      s"($externalResultTypeTerm)"
+    } else {
+      s"($externalResultTypeTerm) (${typeTerm(externalResultClassBoxed)})"
 
 Review comment:
   We cannot determine the return class of the method at this point. The JVM will pick the right method to call with the given signature. The JVM should be smart enough to remove the cast here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] JingsongLi commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373388215
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
 ##########
 @@ -572,12 +572,15 @@ object GenerateUtils {
     * @param ctx code generator context which maintains various code statements.
     * @param fieldType type of field
     * @param fieldTerm expression term of field to be unboxed
+    * @param unboxingTerm unboxing/conversion term
     * @return internal unboxed field representation
     */
   def generateInputFieldUnboxing(
       ctx: CodeGeneratorContext,
       fieldType: LogicalType,
-      fieldTerm: String): GeneratedExpression = {
+      fieldTerm: String,
+      unboxingTerm: String)
 
 Review comment:
   `unboxingTerm` -> `outputTerm` or `outputUnboxingTerm`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] JingsongLi commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373369121
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -118,6 +119,17 @@ object CodeGenUtils {
     */
   def className[T](implicit m: Manifest[T]): String = m.runtimeClass.getCanonicalName
 
+  /**
+   * Returns a term for representing the given class in Java code.
+   */
+  def typeTerm(clazz: Class[_]): String = {
 
 Review comment:
   How about just `getCanonicalName`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#issuecomment-579780094
 
 
   <!--
   Meta data
   Hash:ef2e38562b3356eb4bf7c54d207367bb52e3e76e Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/146600040 TriggerType:PUSH TriggerID:ef2e38562b3356eb4bf7c54d207367bb52e3e76e
   Hash:ef2e38562b3356eb4bf7c54d207367bb52e3e76e Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4656 TriggerType:PUSH TriggerID:ef2e38562b3356eb4bf7c54d207367bb52e3e76e
   Hash:54efb5983f1197afa72cb57e3930c1ab8e9feb2c Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4735 TriggerType:PUSH TriggerID:54efb5983f1197afa72cb57e3930c1ab8e9feb2c
   Hash:54efb5983f1197afa72cb57e3930c1ab8e9feb2c Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/146937422 TriggerType:PUSH TriggerID:54efb5983f1197afa72cb57e3930c1ab8e9feb2c
   -->
   ## CI report:
   
   * ef2e38562b3356eb4bf7c54d207367bb52e3e76e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/146600040) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4656) 
   * 54efb5983f1197afa72cb57e3930c1ab8e9feb2c Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/146937422) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4735) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r374681820
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -694,38 +702,70 @@ object CodeGenUtils {
     }
   }
 
+  /**
+   * Generates code for converting the given external source data type to the internal data format.
+   *
+   * Use this function for converting at the edges of the API.
+   */
   def genToInternalIfNeeded(
       ctx: CodeGeneratorContext,
-      t: DataType,
-      term: String): String = {
-    if (isInternalClass(t)) {
-      s"(${boxedTypeTermForType(fromDataTypeToLogicalType(t))}) $term"
+      sourceDataType: DataType,
+      externalTerm: String)
+    : GeneratedExpression = {
+    val sourceType = sourceDataType.getLogicalType
+    val sourceClass = sourceDataType.getConversionClass
+    // convert external source type to internal format
+    val internalResultTerm = if (isInternalClass(sourceDataType)) {
+      s"$externalTerm"
     } else {
-      genToInternal(ctx, t, term)
+      genToInternal(ctx, sourceDataType, externalTerm)
+    }
+    // extract null term from result term
+    if (sourceClass.isPrimitive) {
+      generateNonNullField(sourceType, internalResultTerm)
 
 Review comment:
   Users will get a null pointer exception. Which is expected if people override the default type inference and implement advanced features. Usually, this exception should not happen as people will use the extraction + annotations.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373534528
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -172,11 +184,7 @@ object CodeGenUtils {
     * We only use TypeInformation to store external type info.
     */
   def boxedTypeTermForExternalType(t: DataType): String = {
-    if (t.getConversionClass == null) {
-      ClassLogicalTypeConverter.getDefaultExternalClassForType(t.getLogicalType).getCanonicalName
-    } else {
-      t.getConversionClass.getCanonicalName
-    }
+    t.getConversionClass.getCanonicalName
 
 Review comment:
   Use `typeTerm(t.getConversionClass)`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373861463
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
 ##########
 @@ -421,4 +431,173 @@ private void testUserDefinedCatalogFunction(String createFunctionDDL) throws Exc
 		tEnv().sqlUpdate("drop table t1");
 		tEnv().sqlUpdate("drop table t2");
 	}
+
+	@Test
+	public void testPrimitiveScalarFunction() throws Exception {
+		final List<Row> sourceData = Arrays.asList(
+			Row.of(1, 1L, "-"),
+			Row.of(2, 2L, "--"),
+			Row.of(3, 3L, "---")
+		);
+
+		final List<Row> sinkData = Arrays.asList(
+			Row.of(1, 3L, "-"),
+			Row.of(2, 6L, "--"),
+			Row.of(3, 9L, "---")
+		);
+
+		TestCollectionTableFactory.reset();
+		TestCollectionTableFactory.initData(sourceData);
+
+		tEnv().sqlUpdate("CREATE TABLE TestTable(a INT NOT NULL, b BIGINT NOT NULL, c STRING) WITH ('connector' = 'COLLECTION')");
+
+		tEnv().createTemporarySystemFunction("PrimitiveScalarFunction", PrimitiveScalarFunction.class);
+		tEnv().sqlUpdate("INSERT INTO TestTable SELECT a, PrimitiveScalarFunction(a, b, c), c FROM TestTable");
+		tEnv().execute("Test Job");
+
+		assertThat(TestCollectionTableFactory.getResult(), equalTo(sinkData));
+	}
+
+	@Test
+	public void testComplexScalarFunction() throws Exception {
+		final List<Row> sourceData = Arrays.asList(
+			Row.of(1, new byte[]{1, 2, 3}),
+			Row.of(2, new byte[]{2, 3, 4}),
+			Row.of(3, new byte[]{3, 4, 5})
+		);
+
+		final List<Row> sinkData = Arrays.asList(
+			Row.of(1, "1+2012-12-12 12:12:12.123456789", "[1, 2, 3]+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), "[1, 2, 3]"),
+			Row.of(2, "2+2012-12-12 12:12:12.123456789", "[2, 3, 4]+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), "[2, 3, 4]"),
+			Row.of(3, "3+2012-12-12 12:12:12.123456789", "[3, 4, 5]+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), "[3, 4, 5]")
+		);
+
+		TestCollectionTableFactory.reset();
+		TestCollectionTableFactory.initData(sourceData);
+
+		tEnv().sqlUpdate(
+			"CREATE TABLE SourceTable(i INT, b BYTES) " +
+			"WITH ('connector' = 'COLLECTION')");
+		tEnv().sqlUpdate(
+			"CREATE TABLE SinkTable(i INT, s1 STRING, s2 STRING, d DECIMAL(5, 2), s3 STRING) " +
+			"WITH ('connector' = 'COLLECTION')");
+
+		tEnv().createTemporarySystemFunction("ComplexScalarFunction", ComplexScalarFunction.class);
+		tEnv().sqlUpdate(
+			"INSERT INTO SinkTable " +
+			"SELECT " +
+			"  i, " +
+			"  ComplexScalarFunction(i, TIMESTAMP '2012-12-12 12:12:12.123456789'), " +
+			"  ComplexScalarFunction(b, TIMESTAMP '2012-12-12 12:12:12.123456789')," +
+			"  ComplexScalarFunction(), " +
+			"  ComplexScalarFunction(b) " +
+			"FROM SourceTable");
+		tEnv().execute("Test Job");
+
+		assertThat(TestCollectionTableFactory.getResult(), equalTo(sinkData));
+	}
+
+	@Test
+	public void testCustomScalarFunction() throws Exception {
+		final List<Row> sourceData = Arrays.asList(
+			Row.of(1),
+			Row.of(2),
+			Row.of(3)
+		);
+
+		final List<Row> sinkData = Arrays.asList(
+			Row.of(1, 1, 5),
+			Row.of(2, 2, 5),
+			Row.of(3, 3, 5)
+		);
+
+		TestCollectionTableFactory.reset();
+		TestCollectionTableFactory.initData(sourceData);
+
+		tEnv().sqlUpdate("CREATE TABLE SourceTable(i INT) WITH ('connector' = 'COLLECTION')");
+		tEnv().sqlUpdate("CREATE TABLE SinkTable(i1 INT, i2 INT, i3 INT) WITH ('connector' = 'COLLECTION')");
+
+		tEnv().createTemporarySystemFunction("CustomScalarFunction", CustomScalarFunction.class);
+		tEnv().sqlUpdate(
+			"INSERT INTO SinkTable " +
+			"SELECT " +
+			"  i, " +
+			"  CustomScalarFunction(i), " +
+			"  CustomScalarFunction(CAST(NULL AS INT), 5, i, i) " +
+			"FROM SourceTable");
+		tEnv().execute("Test Job");
+
+		assertThat(TestCollectionTableFactory.getResult(), equalTo(sinkData));
+	}
+
+	@Test
+	public void testInvalidCustomScalarFunction() {
+		tEnv().sqlUpdate("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')");
+
+		tEnv().createTemporarySystemFunction("CustomScalarFunction", CustomScalarFunction.class);
+		try {
+			tEnv().sqlUpdate(
+				"INSERT INTO SinkTable " +
+				"SELECT CustomScalarFunction('test')");
+			fail();
+		} catch (CodeGenException e) {
+			assertThat(
+				e,
+				hasMessage(
+					equalTo(
+						"Could not find an implementation method that matches the following " +
 
 Review comment:
   nit: Shall we print the class name of the udf to which the function resolved to and available `eval` methods?
   
   I think users would appreciate that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] JingsongLi commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373394902
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingSqlFunctionCallGen.scala
 ##########
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen.calls
+import java.lang.reflect.Method
+import java.util.Collections
+
+import org.apache.calcite.rex.{RexCall, RexCallBinding}
+import org.apache.flink.table.functions.UserDefinedFunctionHelper.SCALAR_EVAL
+import org.apache.flink.table.functions.{ScalarFunction, TableFunction, UserDefinedFunction, UserDefinedFunctionHelper}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{genToExternalIfNeeded, genToInternalIfNeeded, typeTerm}
+import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGeneratorContext, GeneratedExpression}
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
+import org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
+import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.extraction.utils.ExtractionUtils
+import org.apache.flink.table.types.extraction.utils.ExtractionUtils.{createMethodSignatureString, isAssignable, isMethodInvokable, primitiveToWrapper}
+import org.apache.flink.table.types.inference.TypeInferenceUtil
+import org.apache.flink.table.types.logical.LogicalType
+
+/**
+ * Generates a call to a user-defined [[ScalarFunction]] or [[TableFunction]] (future work).
+ */
+class BridgingSqlFunctionCallGen(call: RexCall) extends CallGenerator {
+
+  private val function: BridgingSqlFunction = call.getOperator.asInstanceOf[BridgingSqlFunction]
+  private val udf: UserDefinedFunction = function.getDefinition.asInstanceOf[UserDefinedFunction]
+
+  override def generate(
+      ctx: CodeGeneratorContext,
+      operands: Seq[GeneratedExpression],
+      returnType: LogicalType)
+    : GeneratedExpression = {
+
+    val inference = function.getTypeInference
+
+    // we could have implemented a dedicated code generation context but the closer we are to
+    // Calcite the more consistent is the type inference during the data type enrichment
+    val callContext = new OperatorBindingCallContext(
+      function.getDataTypeFactory,
+      udf,
+      RexCallBinding.create(
+        function.getTypeFactory,
+        call,
+        Collections.emptyList()))
+
+    // enrich argument types with conversion class
+    val adaptedCallContext = TypeInferenceUtil.adaptArguments(
+      inference,
+      callContext,
+      null)
+    val enrichedArgumentDataTypes = toScala(adaptedCallContext.getArgumentDataTypes)
+    verifyArgumentTypes(operands.map(_.resultType), enrichedArgumentDataTypes)
+
+    // enrich output types with conversion class
+    val enrichedOutputDataType = TypeInferenceUtil.inferOutputType(
+      adaptedCallContext,
+      inference.getOutputTypeStrategy)
+    verifyOutputType(returnType, enrichedOutputDataType)
+
+    // find runtime method and generate call
+    verifyImplementation(enrichedArgumentDataTypes, enrichedOutputDataType)
+    generateFunctionCall(ctx, operands, enrichedArgumentDataTypes, enrichedOutputDataType)
+  }
+
+  private def generateFunctionCall(
+      ctx: CodeGeneratorContext,
+      operands: Seq[GeneratedExpression],
+      argumentDataTypes: Seq[DataType],
+      outputDataType: DataType)
+    : GeneratedExpression = {
+
+    val functionTerm = ctx.addReusableFunction(udf)
+
+    // operand conversion
+    val externalOperands = prepareExternalOperands(ctx, operands, argumentDataTypes)
+    val externalOperandTerms = externalOperands.map(_.resultTerm).mkString(", ")
+
+    // result conversion
+    val externalResultClass = outputDataType.getConversionClass
+    val externalResultTypeTerm = typeTerm(externalResultClass)
+    // Janino does not fully support the JVM spec:
+    // boolean b = (boolean) f(); where f returns Object
+    // This is not supported and we need to box manually.
+    val externalResultClassBoxed = primitiveToWrapper(externalResultClass)
+    val externalResultCasting = if (externalResultClass == externalResultClassBoxed) {
+      s"($externalResultTypeTerm)"
+    } else {
+      s"($externalResultTypeTerm) (${typeTerm(externalResultClassBoxed)})"
 
 Review comment:
   Can we check the method return class too? If it return primitive class, we don't need add this cast.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on issue #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr commented on issue #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#issuecomment-581970076
 
 
   Thanks for the review @JingsongLi and @dawidwys. I hope I could address most feedback. I will merge this once Travis gives green light. I will open a follow-up issue for more extensive tests for all data types.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#issuecomment-579780094
 
 
   <!--
   Meta data
   Hash:ef2e38562b3356eb4bf7c54d207367bb52e3e76e Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/146600040 TriggerType:PUSH TriggerID:ef2e38562b3356eb4bf7c54d207367bb52e3e76e
   Hash:ef2e38562b3356eb4bf7c54d207367bb52e3e76e Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4656 TriggerType:PUSH TriggerID:ef2e38562b3356eb4bf7c54d207367bb52e3e76e
   -->
   ## CI report:
   
   * ef2e38562b3356eb4bf7c54d207367bb52e3e76e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/146600040) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4656) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373505425
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -694,38 +706,67 @@ object CodeGenUtils {
     }
   }
 
+  /**
+   * Generates code for converting the given external source data type to the internal data format.
+   *
+   * Use this function for converting at the edges of the API.
+   */
   def genToInternalIfNeeded(
       ctx: CodeGeneratorContext,
-      t: DataType,
-      term: String): String = {
-    if (isInternalClass(t)) {
-      s"(${boxedTypeTermForType(fromDataTypeToLogicalType(t))}) $term"
+      sourceType: DataType,
+      externalTerm: String)
+    : GeneratedExpression = {
+    // convert external source type to internal format
+    val internalResultTerm = if (isInternalClass(sourceType)) {
+      s"(${boxedTypeTermForType(fromDataTypeToLogicalType(sourceType))}) $externalTerm"
 
 Review comment:
   It seems that the cast for Janino in the `BridgingSqlFunctionCallGen` also solves the cast of this method. So we can get rid of this cast here. Let's see if Travis gives green light.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] JingsongLi commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373387992
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
 ##########
 @@ -572,12 +572,15 @@ object GenerateUtils {
     * @param ctx code generator context which maintains various code statements.
     * @param fieldType type of field
     * @param fieldTerm expression term of field to be unboxed
+    * @param unboxingTerm unboxing/conversion term
     * @return internal unboxed field representation
     */
   def generateInputFieldUnboxing(
       ctx: CodeGeneratorContext,
       fieldType: LogicalType,
-      fieldTerm: String): GeneratedExpression = {
+      fieldTerm: String,
 
 Review comment:
   `fieldTerm` -> `inputTerm`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373427946
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -118,6 +119,17 @@ object CodeGenUtils {
     */
   def className[T](implicit m: Manifest[T]): String = m.runtimeClass.getCanonicalName
 
+  /**
+   * Returns a term for representing the given class in Java code.
+   */
+  def typeTerm(clazz: Class[_]): String = {
 
 Review comment:
   Canonical name get be null. Class.getName always returns a value.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373860780
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
 ##########
 @@ -421,4 +431,173 @@ private void testUserDefinedCatalogFunction(String createFunctionDDL) throws Exc
 		tEnv().sqlUpdate("drop table t1");
 		tEnv().sqlUpdate("drop table t2");
 	}
+
+	@Test
+	public void testPrimitiveScalarFunction() throws Exception {
+		final List<Row> sourceData = Arrays.asList(
+			Row.of(1, 1L, "-"),
+			Row.of(2, 2L, "--"),
+			Row.of(3, 3L, "---")
+		);
+
+		final List<Row> sinkData = Arrays.asList(
+			Row.of(1, 3L, "-"),
+			Row.of(2, 6L, "--"),
+			Row.of(3, 9L, "---")
+		);
+
+		TestCollectionTableFactory.reset();
+		TestCollectionTableFactory.initData(sourceData);
+
+		tEnv().sqlUpdate("CREATE TABLE TestTable(a INT NOT NULL, b BIGINT NOT NULL, c STRING) WITH ('connector' = 'COLLECTION')");
+
+		tEnv().createTemporarySystemFunction("PrimitiveScalarFunction", PrimitiveScalarFunction.class);
+		tEnv().sqlUpdate("INSERT INTO TestTable SELECT a, PrimitiveScalarFunction(a, b, c), c FROM TestTable");
+		tEnv().execute("Test Job");
+
+		assertThat(TestCollectionTableFactory.getResult(), equalTo(sinkData));
+	}
+
+	@Test
+	public void testComplexScalarFunction() throws Exception {
+		final List<Row> sourceData = Arrays.asList(
+			Row.of(1, new byte[]{1, 2, 3}),
+			Row.of(2, new byte[]{2, 3, 4}),
+			Row.of(3, new byte[]{3, 4, 5})
+		);
+
+		final List<Row> sinkData = Arrays.asList(
+			Row.of(1, "1+2012-12-12 12:12:12.123456789", "[1, 2, 3]+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), "[1, 2, 3]"),
+			Row.of(2, "2+2012-12-12 12:12:12.123456789", "[2, 3, 4]+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), "[2, 3, 4]"),
+			Row.of(3, "3+2012-12-12 12:12:12.123456789", "[3, 4, 5]+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), "[3, 4, 5]")
+		);
+
+		TestCollectionTableFactory.reset();
+		TestCollectionTableFactory.initData(sourceData);
+
+		tEnv().sqlUpdate(
+			"CREATE TABLE SourceTable(i INT, b BYTES) " +
+			"WITH ('connector' = 'COLLECTION')");
+		tEnv().sqlUpdate(
+			"CREATE TABLE SinkTable(i INT, s1 STRING, s2 STRING, d DECIMAL(5, 2), s3 STRING) " +
+			"WITH ('connector' = 'COLLECTION')");
+
+		tEnv().createTemporarySystemFunction("ComplexScalarFunction", ComplexScalarFunction.class);
+		tEnv().sqlUpdate(
+			"INSERT INTO SinkTable " +
+			"SELECT " +
+			"  i, " +
+			"  ComplexScalarFunction(i, TIMESTAMP '2012-12-12 12:12:12.123456789'), " +
+			"  ComplexScalarFunction(b, TIMESTAMP '2012-12-12 12:12:12.123456789')," +
+			"  ComplexScalarFunction(), " +
+			"  ComplexScalarFunction(b) " +
+			"FROM SourceTable");
+		tEnv().execute("Test Job");
+
+		assertThat(TestCollectionTableFactory.getResult(), equalTo(sinkData));
+	}
+
+	@Test
+	public void testCustomScalarFunction() throws Exception {
+		final List<Row> sourceData = Arrays.asList(
+			Row.of(1),
+			Row.of(2),
+			Row.of(3)
+		);
+
+		final List<Row> sinkData = Arrays.asList(
+			Row.of(1, 1, 5),
+			Row.of(2, 2, 5),
+			Row.of(3, 3, 5)
+		);
+
+		TestCollectionTableFactory.reset();
+		TestCollectionTableFactory.initData(sourceData);
+
+		tEnv().sqlUpdate("CREATE TABLE SourceTable(i INT) WITH ('connector' = 'COLLECTION')");
+		tEnv().sqlUpdate("CREATE TABLE SinkTable(i1 INT, i2 INT, i3 INT) WITH ('connector' = 'COLLECTION')");
+
+		tEnv().createTemporarySystemFunction("CustomScalarFunction", CustomScalarFunction.class);
+		tEnv().sqlUpdate(
+			"INSERT INTO SinkTable " +
+			"SELECT " +
+			"  i, " +
+			"  CustomScalarFunction(i), " +
+			"  CustomScalarFunction(CAST(NULL AS INT), 5, i, i) " +
+			"FROM SourceTable");
+		tEnv().execute("Test Job");
+
+		assertThat(TestCollectionTableFactory.getResult(), equalTo(sinkData));
+	}
+
+	@Test
+	public void testInvalidCustomScalarFunction() {
+		tEnv().sqlUpdate("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')");
+
+		tEnv().createTemporarySystemFunction("CustomScalarFunction", CustomScalarFunction.class);
+		try {
+			tEnv().sqlUpdate(
+				"INSERT INTO SinkTable " +
+				"SELECT CustomScalarFunction('test')");
+			fail();
+		} catch (CodeGenException e) {
+			assertThat(
+				e,
+				hasMessage(
+					equalTo(
+						"Could not find an implementation method that matches the following " +
+							"signature: java.lang.String eval(java.lang.String)")));
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Test functions
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Function that takes and returns primitives.
+	 */
+	public static class PrimitiveScalarFunction extends ScalarFunction {
+		public long eval(int i, long l, String s) {
+			return i + l + s.length();
+		}
+	}
+
+	/**
+	 * Function that is overloaded and takes use of annotations.
+	 */
+	public static class ComplexScalarFunction extends ScalarFunction {
+		public String eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o, java.sql.Timestamp t) {
+			return EncodingUtils.objectToString(o) + "+" + t.toString();
+		}
+
+		public @DataTypeHint("DECIMAL(5, 2)") BigDecimal eval() {
+			return new BigDecimal("123.4"); // 1 digit is missing
+		}
+
+		public String eval(byte[] bytes) {
+			return Arrays.toString(bytes);
+		}
+	}
+
+	/**
+	 * Function that has a custom type inference that is broader than the actual implementation.
+	 */
+	public static class CustomScalarFunction extends ScalarFunction {
+		public Integer eval(Integer... args) {
+			for (Integer o : args) {
+				if (o != null) {
+					return o;
+				}
+			}
+			return null;
+		}
+
+		@Override
+		public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+			return TypeInference.newBuilder()
+				.outputTypeStrategy(TypeStrategies.argument(0))
 
 Review comment:
   Shall we maybe change the `inputTypeStrategy` to be also required instead of a `WILDCARD`? I think we should not assume anything for users that end up defining their own type inference strategies. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373486485
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
 ##########
 @@ -572,12 +572,15 @@ object GenerateUtils {
     * @param ctx code generator context which maintains various code statements.
     * @param fieldType type of field
     * @param fieldTerm expression term of field to be unboxed
+    * @param unboxingTerm unboxing/conversion term
     * @return internal unboxed field representation
     */
   def generateInputFieldUnboxing(
       ctx: CodeGeneratorContext,
       fieldType: LogicalType,
-      fieldTerm: String): GeneratedExpression = {
+      fieldTerm: String,
 
 Review comment:
   I think the method should be refactored in the future. In Flink planner, this method was intended to perform `genToInternalIfNeeded`. Now the concepts are mixed up.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373534145
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -118,6 +119,17 @@ object CodeGenUtils {
     */
   def className[T](implicit m: Manifest[T]): String = m.runtimeClass.getCanonicalName
 
+  /**
+   * Returns a term for representing the given class in Java code.
+   */
+  def typeTerm(clazz: Class[_]): String = {
 
 Review comment:
   Hmm, I am also in favor of @JingsongLi suggestion. As per the javadoc of `getCanonicalName` I think if it gives us null, we cannot use the term anyway.
   
   We could add a check here that throws exception that given class does not have a canonical representation and thus we cannot use it for code generation.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r374719017
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
 ##########
 @@ -570,37 +570,40 @@ object GenerateUtils {
     * Wrapper types can autoboxed to their corresponding primitive type (Integer -> int).
     *
     * @param ctx code generator context which maintains various code statements.
-    * @param fieldType type of field
-    * @param fieldTerm expression term of field to be unboxed
+    * @param inputType type of field
+    * @param inputTerm expression term of field to be unboxed
+    * @param inputUnboxingTerm unboxing/conversion term
     * @return internal unboxed field representation
     */
   def generateInputFieldUnboxing(
       ctx: CodeGeneratorContext,
-      fieldType: LogicalType,
-      fieldTerm: String): GeneratedExpression = {
+      inputType: LogicalType,
+      inputTerm: String,
+      inputUnboxingTerm: String)
 
 Review comment:
   With the old logic we were performing `.toInternal()` conversion twice. One time for the null check and one time for the assignment. This improves the runtime code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r374717503
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -172,11 +184,7 @@ object CodeGenUtils {
     * We only use TypeInformation to store external type info.
     */
   def boxedTypeTermForExternalType(t: DataType): String = {
-    if (t.getConversionClass == null) {
-      ClassLogicalTypeConverter.getDefaultExternalClassForType(t.getLogicalType).getCanonicalName
-    } else {
-      t.getConversionClass.getCanonicalName
-    }
+    t.getConversionClass.getCanonicalName
 
 Review comment:
   Seems to be legacy. I removed it entirely.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373427946
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -118,6 +119,17 @@ object CodeGenUtils {
     */
   def className[T](implicit m: Manifest[T]): String = m.runtimeClass.getCanonicalName
 
+  /**
+   * Returns a term for representing the given class in Java code.
+   */
+  def typeTerm(clazz: Class[_]): String = {
 
 Review comment:
   Canonical name can be null. Class.getName always returns a value.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r374718058
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -694,38 +702,70 @@ object CodeGenUtils {
     }
   }
 
+  /**
+   * Generates code for converting the given external source data type to the internal data format.
+   *
+   * Use this function for converting at the edges of the API.
+   */
   def genToInternalIfNeeded(
       ctx: CodeGeneratorContext,
-      t: DataType,
-      term: String): String = {
-    if (isInternalClass(t)) {
-      s"(${boxedTypeTermForType(fromDataTypeToLogicalType(t))}) $term"
+      sourceDataType: DataType,
+      externalTerm: String)
+    : GeneratedExpression = {
+    val sourceType = sourceDataType.getLogicalType
+    val sourceClass = sourceDataType.getConversionClass
+    // convert external source type to internal format
+    val internalResultTerm = if (isInternalClass(sourceDataType)) {
+      s"$externalTerm"
     } else {
-      genToInternal(ctx, t, term)
+      genToInternal(ctx, sourceDataType, externalTerm)
+    }
+    // extract null term from result term
+    if (sourceClass.isPrimitive) {
+      generateNonNullField(sourceType, internalResultTerm)
+    } else {
+      generateInputFieldUnboxing(ctx, sourceType, externalTerm, internalResultTerm)
     }
   }
 
-  def genToExternal(ctx: CodeGeneratorContext, t: DataType, term: String): String = {
-    val iTerm = boxedTypeTermForType(fromDataTypeToLogicalType(t))
-    if (isConverterIdentity(t)) {
-      s"($iTerm) $term"
+  def genToExternal(
 
 Review comment:
   I have the same feeling. But this should be a follow up task. Currently, it is still used at a couple of places.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on issue #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr commented on issue #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#issuecomment-579773506
 
 
   CC @JingsongLi 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] JingsongLi commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373385389
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -694,38 +706,67 @@ object CodeGenUtils {
     }
   }
 
+  /**
+   * Generates code for converting the given external source data type to the internal data format.
+   *
+   * Use this function for converting at the edges of the API.
+   */
   def genToInternalIfNeeded(
       ctx: CodeGeneratorContext,
-      t: DataType,
-      term: String): String = {
-    if (isInternalClass(t)) {
-      s"(${boxedTypeTermForType(fromDataTypeToLogicalType(t))}) $term"
+      sourceType: DataType,
+      externalTerm: String)
+    : GeneratedExpression = {
+    // convert external source type to internal format
+    val internalResultTerm = if (isInternalClass(sourceType)) {
+      s"(${boxedTypeTermForType(fromDataTypeToLogicalType(sourceType))}) $externalTerm"
+    } else {
+      genToInternal(ctx, sourceType, externalTerm)
+    }
+    // extract null term from result term
+    if (sourceType.getConversionClass.isPrimitive) {
+      generateNonNullField(sourceType.getLogicalType, internalResultTerm)
     } else {
-      genToInternal(ctx, t, term)
+      generateInputFieldUnboxing(ctx, sourceType.getLogicalType, externalTerm, internalResultTerm)
     }
   }
 
-  def genToExternal(ctx: CodeGeneratorContext, t: DataType, term: String): String = {
-    val iTerm = boxedTypeTermForType(fromDataTypeToLogicalType(t))
-    if (isConverterIdentity(t)) {
-      s"($iTerm) $term"
+  def genToExternal(
+      ctx: CodeGeneratorContext,
+      targetType: DataType,
+      internalTerm: String): String = {
+    if (isConverterIdentity(targetType)) {
+      s"$internalTerm"
     } else {
-      val eTerm = boxedTypeTermForExternalType(t)
+      val iTerm = boxedTypeTermForType(fromDataTypeToLogicalType(targetType))
+      val eTerm = boxedTypeTermForExternalType(targetType)
 
 Review comment:
   If you don't mind, consider a hotfix to modify the `boxedTypeTermForExternalType`, because `getConversionClass` can't be null now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#issuecomment-579780094
 
 
   <!--
   Meta data
   Hash:ef2e38562b3356eb4bf7c54d207367bb52e3e76e Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/146600040 TriggerType:PUSH TriggerID:ef2e38562b3356eb4bf7c54d207367bb52e3e76e
   Hash:ef2e38562b3356eb4bf7c54d207367bb52e3e76e Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4656 TriggerType:PUSH TriggerID:ef2e38562b3356eb4bf7c54d207367bb52e3e76e
   Hash:54efb5983f1197afa72cb57e3930c1ab8e9feb2c Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4735 TriggerType:PUSH TriggerID:54efb5983f1197afa72cb57e3930c1ab8e9feb2c
   Hash:54efb5983f1197afa72cb57e3930c1ab8e9feb2c Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/146937422 TriggerType:PUSH TriggerID:54efb5983f1197afa72cb57e3930c1ab8e9feb2c
   Hash:54efb5983f1197afa72cb57e3930c1ab8e9feb2c Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/146937422 TriggerType:MANUAL TriggerID:581970303
   Hash:007a150958ef330ed3a13f6f8251cec8997e6fce Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:007a150958ef330ed3a13f6f8251cec8997e6fce
   -->
   ## CI report:
   
   * ef2e38562b3356eb4bf7c54d207367bb52e3e76e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/146600040) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4656) 
   * 54efb5983f1197afa72cb57e3930c1ab8e9feb2c Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/146937422) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4735) 
   * 007a150958ef330ed3a13f6f8251cec8997e6fce UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#issuecomment-579780094
 
 
   <!--
   Meta data
   Hash:ef2e38562b3356eb4bf7c54d207367bb52e3e76e Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/146600040 TriggerType:PUSH TriggerID:ef2e38562b3356eb4bf7c54d207367bb52e3e76e
   Hash:ef2e38562b3356eb4bf7c54d207367bb52e3e76e Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4656 TriggerType:PUSH TriggerID:ef2e38562b3356eb4bf7c54d207367bb52e3e76e
   Hash:54efb5983f1197afa72cb57e3930c1ab8e9feb2c Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4735 TriggerType:PUSH TriggerID:54efb5983f1197afa72cb57e3930c1ab8e9feb2c
   Hash:54efb5983f1197afa72cb57e3930c1ab8e9feb2c Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/146937422 TriggerType:PUSH TriggerID:54efb5983f1197afa72cb57e3930c1ab8e9feb2c
   Hash:54efb5983f1197afa72cb57e3930c1ab8e9feb2c Status:CANCELED URL:https://travis-ci.com/flink-ci/flink/builds/146937422 TriggerType:MANUAL TriggerID:581970303
   Hash:54efb5983f1197afa72cb57e3930c1ab8e9feb2c Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4735 TriggerType:MANUAL TriggerID:581970303
   Hash:007a150958ef330ed3a13f6f8251cec8997e6fce Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4836 TriggerType:PUSH TriggerID:007a150958ef330ed3a13f6f8251cec8997e6fce
   Hash:007a150958ef330ed3a13f6f8251cec8997e6fce Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/147408364 TriggerType:PUSH TriggerID:007a150958ef330ed3a13f6f8251cec8997e6fce
   -->
   ## CI report:
   
   * ef2e38562b3356eb4bf7c54d207367bb52e3e76e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/146600040) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4656) 
   * 54efb5983f1197afa72cb57e3930c1ab8e9feb2c Travis: [CANCELED](https://travis-ci.com/flink-ci/flink/builds/146937422) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4735) 
   * 007a150958ef330ed3a13f6f8251cec8997e6fce Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/147408364) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4836) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr closed pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr closed pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#issuecomment-579780094
 
 
   <!--
   Meta data
   Hash:ef2e38562b3356eb4bf7c54d207367bb52e3e76e Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/146600040 TriggerType:PUSH TriggerID:ef2e38562b3356eb4bf7c54d207367bb52e3e76e
   Hash:ef2e38562b3356eb4bf7c54d207367bb52e3e76e Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4656 TriggerType:PUSH TriggerID:ef2e38562b3356eb4bf7c54d207367bb52e3e76e
   Hash:54efb5983f1197afa72cb57e3930c1ab8e9feb2c Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4735 TriggerType:PUSH TriggerID:54efb5983f1197afa72cb57e3930c1ab8e9feb2c
   Hash:54efb5983f1197afa72cb57e3930c1ab8e9feb2c Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/146937422 TriggerType:PUSH TriggerID:54efb5983f1197afa72cb57e3930c1ab8e9feb2c
   -->
   ## CI report:
   
   * ef2e38562b3356eb4bf7c54d207367bb52e3e76e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/146600040) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4656) 
   * 54efb5983f1197afa72cb57e3930c1ab8e9feb2c Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/146937422) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4735) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#issuecomment-579780094
 
 
   <!--
   Meta data
   Hash:ef2e38562b3356eb4bf7c54d207367bb52e3e76e Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/146600040 TriggerType:PUSH TriggerID:ef2e38562b3356eb4bf7c54d207367bb52e3e76e
   Hash:ef2e38562b3356eb4bf7c54d207367bb52e3e76e Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4656 TriggerType:PUSH TriggerID:ef2e38562b3356eb4bf7c54d207367bb52e3e76e
   Hash:54efb5983f1197afa72cb57e3930c1ab8e9feb2c Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4735 TriggerType:PUSH TriggerID:54efb5983f1197afa72cb57e3930c1ab8e9feb2c
   Hash:54efb5983f1197afa72cb57e3930c1ab8e9feb2c Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/146937422 TriggerType:PUSH TriggerID:54efb5983f1197afa72cb57e3930c1ab8e9feb2c
   Hash:54efb5983f1197afa72cb57e3930c1ab8e9feb2c Status:CANCELED URL:https://travis-ci.com/flink-ci/flink/builds/146937422 TriggerType:MANUAL TriggerID:581970303
   Hash:54efb5983f1197afa72cb57e3930c1ab8e9feb2c Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4735 TriggerType:MANUAL TriggerID:581970303
   Hash:007a150958ef330ed3a13f6f8251cec8997e6fce Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4836 TriggerType:PUSH TriggerID:007a150958ef330ed3a13f6f8251cec8997e6fce
   Hash:007a150958ef330ed3a13f6f8251cec8997e6fce Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/147408364 TriggerType:PUSH TriggerID:007a150958ef330ed3a13f6f8251cec8997e6fce
   -->
   ## CI report:
   
   * ef2e38562b3356eb4bf7c54d207367bb52e3e76e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/146600040) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4656) 
   * 54efb5983f1197afa72cb57e3930c1ab8e9feb2c Travis: [CANCELED](https://travis-ci.com/flink-ci/flink/builds/146937422) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4735) 
   * 007a150958ef330ed3a13f6f8251cec8997e6fce Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/147408364) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4836) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#issuecomment-579780094
 
 
   <!--
   Meta data
   Hash:ef2e38562b3356eb4bf7c54d207367bb52e3e76e Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/146600040 TriggerType:PUSH TriggerID:ef2e38562b3356eb4bf7c54d207367bb52e3e76e
   Hash:ef2e38562b3356eb4bf7c54d207367bb52e3e76e Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4656 TriggerType:PUSH TriggerID:ef2e38562b3356eb4bf7c54d207367bb52e3e76e
   Hash:54efb5983f1197afa72cb57e3930c1ab8e9feb2c Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:54efb5983f1197afa72cb57e3930c1ab8e9feb2c
   -->
   ## CI report:
   
   * ef2e38562b3356eb4bf7c54d207367bb52e3e76e Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/146600040) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4656) 
   * 54efb5983f1197afa72cb57e3930c1ab8e9feb2c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373519914
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -694,38 +706,67 @@ object CodeGenUtils {
     }
   }
 
+  /**
+   * Generates code for converting the given external source data type to the internal data format.
+   *
+   * Use this function for converting at the edges of the API.
+   */
   def genToInternalIfNeeded(
       ctx: CodeGeneratorContext,
-      t: DataType,
-      term: String): String = {
-    if (isInternalClass(t)) {
-      s"(${boxedTypeTermForType(fromDataTypeToLogicalType(t))}) $term"
+      sourceType: DataType,
+      externalTerm: String)
+    : GeneratedExpression = {
+    // convert external source type to internal format
+    val internalResultTerm = if (isInternalClass(sourceType)) {
+      s"(${boxedTypeTermForType(fromDataTypeToLogicalType(sourceType))}) $externalTerm"
 
 Review comment:
   The cast is actually not necessary anymore because of the cast in `BridgingSqlFunctionCallGen`. I remove it for now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373860042
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
 ##########
 @@ -570,37 +570,40 @@ object GenerateUtils {
     * Wrapper types can autoboxed to their corresponding primitive type (Integer -> int).
     *
     * @param ctx code generator context which maintains various code statements.
-    * @param fieldType type of field
-    * @param fieldTerm expression term of field to be unboxed
+    * @param inputType type of field
+    * @param inputTerm expression term of field to be unboxed
+    * @param inputUnboxingTerm unboxing/conversion term
     * @return internal unboxed field representation
     */
   def generateInputFieldUnboxing(
       ctx: CodeGeneratorContext,
-      fieldType: LogicalType,
-      fieldTerm: String): GeneratedExpression = {
+      inputType: LogicalType,
+      inputTerm: String,
+      inputUnboxingTerm: String)
 
 Review comment:
   I don't fully understand this change, but do we really need that parameter? Shouldn't we only ever check for null and assign value from the same input? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373519202
 
 

 ##########
 File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
 ##########
 @@ -305,8 +305,12 @@ private DataType extractDataTypeOrError(DataTypeTemplate template, List<Type> ty
 			DataTypeTemplate template,
 			List<Type> typeHierarchy,
 			Type type) {
+		// byte arrays have higher priority than regular arrays
 
 Review comment:
   nit: How about a comment like: `prefer VARBINARY/BYTES() over ARRAY(TINYINT) for bytes[]` instead?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373527498
 
 

 ##########
 File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/utils/FunctionMappingExtractor.java
 ##########
 @@ -329,15 +332,24 @@ public static SignatureExtraction createParameterSignatureExtraction(int offset)
 			int offset) {
 		return IntStream.range(offset, method.getParameterCount())
 			.mapToObj(i -> {
+				// check for input group before start extracting a data type
+				final Parameter parameter = method.getParameters()[i];
+				final DataTypeHint hint = parameter.getAnnotation(DataTypeHint.class);
+				if (hint != null) {
+					final DataTypeTemplate template = DataTypeTemplate.fromAnnotation(hint, null);
+					if (template.inputGroup != null) {
+						return FunctionArgumentTemplate.of(template.inputGroup);
+					}
+				}
+				// extract a concrete data type
 				final DataType type = DataTypeExtractor.extractFromMethodParameter(typeFactory, function, method, i);
 				// unwrap from ARRAY data type in case of varargs
 				if (method.isVarArgs() && i == method.getParameterCount() - 1 && type instanceof CollectionDataType) {
-					return ((CollectionDataType) type).getElementDataType();
+					return FunctionArgumentTemplate.of(((CollectionDataType) type).getElementDataType());
 				} else {
-					return type;
+					return FunctionArgumentTemplate.of(type);
 
 Review comment:
   nit: two separate methods?
   ```
   // check for input group before start extracting a data type
   return tryExtractInputGroup(method, i)
   		// extract a concrete data type
      	.orElseGet(() -> extractUsingExtractor(typeFactory, function, method, i));
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] JingsongLi commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373377618
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -694,38 +706,67 @@ object CodeGenUtils {
     }
   }
 
+  /**
+   * Generates code for converting the given external source data type to the internal data format.
+   *
+   * Use this function for converting at the edges of the API.
+   */
   def genToInternalIfNeeded(
       ctx: CodeGeneratorContext,
-      t: DataType,
-      term: String): String = {
-    if (isInternalClass(t)) {
-      s"(${boxedTypeTermForType(fromDataTypeToLogicalType(t))}) $term"
+      sourceType: DataType,
+      externalTerm: String)
+    : GeneratedExpression = {
+    // convert external source type to internal format
+    val internalResultTerm = if (isInternalClass(sourceType)) {
+      s"(${boxedTypeTermForType(fromDataTypeToLogicalType(sourceType))}) $externalTerm"
+    } else {
+      genToInternal(ctx, sourceType, externalTerm)
+    }
+    // extract null term from result term
+    if (sourceType.getConversionClass.isPrimitive) {
+      generateNonNullField(sourceType.getLogicalType, internalResultTerm)
 
 Review comment:
   replace `sourceType.getLogicalType` to reused field `fromDataTypeToLogicalType(sourceType)`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r374701172
 
 

 ##########
 File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/utils/FunctionMappingExtractor.java
 ##########
 @@ -329,15 +332,24 @@ public static SignatureExtraction createParameterSignatureExtraction(int offset)
 			int offset) {
 		return IntStream.range(offset, method.getParameterCount())
 			.mapToObj(i -> {
+				// check for input group before start extracting a data type
+				final Parameter parameter = method.getParameters()[i];
+				final DataTypeHint hint = parameter.getAnnotation(DataTypeHint.class);
+				if (hint != null) {
+					final DataTypeTemplate template = DataTypeTemplate.fromAnnotation(hint, null);
+					if (template.inputGroup != null) {
+						return FunctionArgumentTemplate.of(template.inputGroup);
+					}
+				}
+				// extract a concrete data type
 				final DataType type = DataTypeExtractor.extractFromMethodParameter(typeFactory, function, method, i);
 				// unwrap from ARRAY data type in case of varargs
 				if (method.isVarArgs() && i == method.getParameterCount() - 1 && type instanceof CollectionDataType) {
-					return ((CollectionDataType) type).getElementDataType();
+					return FunctionArgumentTemplate.of(((CollectionDataType) type).getElementDataType());
 				} else {
-					return type;
+					return FunctionArgumentTemplate.of(type);
 
 Review comment:
   Thanks, I also fixed two other bugs on the way.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r374720708
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
 ##########
 @@ -421,4 +431,173 @@ private void testUserDefinedCatalogFunction(String createFunctionDDL) throws Exc
 		tEnv().sqlUpdate("drop table t1");
 		tEnv().sqlUpdate("drop table t2");
 	}
+
+	@Test
+	public void testPrimitiveScalarFunction() throws Exception {
+		final List<Row> sourceData = Arrays.asList(
+			Row.of(1, 1L, "-"),
+			Row.of(2, 2L, "--"),
+			Row.of(3, 3L, "---")
+		);
+
+		final List<Row> sinkData = Arrays.asList(
+			Row.of(1, 3L, "-"),
+			Row.of(2, 6L, "--"),
+			Row.of(3, 9L, "---")
+		);
+
+		TestCollectionTableFactory.reset();
+		TestCollectionTableFactory.initData(sourceData);
+
+		tEnv().sqlUpdate("CREATE TABLE TestTable(a INT NOT NULL, b BIGINT NOT NULL, c STRING) WITH ('connector' = 'COLLECTION')");
+
+		tEnv().createTemporarySystemFunction("PrimitiveScalarFunction", PrimitiveScalarFunction.class);
+		tEnv().sqlUpdate("INSERT INTO TestTable SELECT a, PrimitiveScalarFunction(a, b, c), c FROM TestTable");
+		tEnv().execute("Test Job");
+
+		assertThat(TestCollectionTableFactory.getResult(), equalTo(sinkData));
+	}
+
+	@Test
+	public void testComplexScalarFunction() throws Exception {
+		final List<Row> sourceData = Arrays.asList(
+			Row.of(1, new byte[]{1, 2, 3}),
+			Row.of(2, new byte[]{2, 3, 4}),
+			Row.of(3, new byte[]{3, 4, 5})
+		);
+
+		final List<Row> sinkData = Arrays.asList(
+			Row.of(1, "1+2012-12-12 12:12:12.123456789", "[1, 2, 3]+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), "[1, 2, 3]"),
+			Row.of(2, "2+2012-12-12 12:12:12.123456789", "[2, 3, 4]+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), "[2, 3, 4]"),
+			Row.of(3, "3+2012-12-12 12:12:12.123456789", "[3, 4, 5]+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), "[3, 4, 5]")
+		);
+
+		TestCollectionTableFactory.reset();
+		TestCollectionTableFactory.initData(sourceData);
+
+		tEnv().sqlUpdate(
+			"CREATE TABLE SourceTable(i INT, b BYTES) " +
+			"WITH ('connector' = 'COLLECTION')");
+		tEnv().sqlUpdate(
+			"CREATE TABLE SinkTable(i INT, s1 STRING, s2 STRING, d DECIMAL(5, 2), s3 STRING) " +
+			"WITH ('connector' = 'COLLECTION')");
+
+		tEnv().createTemporarySystemFunction("ComplexScalarFunction", ComplexScalarFunction.class);
+		tEnv().sqlUpdate(
+			"INSERT INTO SinkTable " +
+			"SELECT " +
+			"  i, " +
+			"  ComplexScalarFunction(i, TIMESTAMP '2012-12-12 12:12:12.123456789'), " +
+			"  ComplexScalarFunction(b, TIMESTAMP '2012-12-12 12:12:12.123456789')," +
+			"  ComplexScalarFunction(), " +
+			"  ComplexScalarFunction(b) " +
+			"FROM SourceTable");
+		tEnv().execute("Test Job");
+
+		assertThat(TestCollectionTableFactory.getResult(), equalTo(sinkData));
+	}
+
+	@Test
+	public void testCustomScalarFunction() throws Exception {
+		final List<Row> sourceData = Arrays.asList(
+			Row.of(1),
+			Row.of(2),
+			Row.of(3)
+		);
+
+		final List<Row> sinkData = Arrays.asList(
+			Row.of(1, 1, 5),
+			Row.of(2, 2, 5),
+			Row.of(3, 3, 5)
+		);
+
+		TestCollectionTableFactory.reset();
+		TestCollectionTableFactory.initData(sourceData);
+
+		tEnv().sqlUpdate("CREATE TABLE SourceTable(i INT) WITH ('connector' = 'COLLECTION')");
+		tEnv().sqlUpdate("CREATE TABLE SinkTable(i1 INT, i2 INT, i3 INT) WITH ('connector' = 'COLLECTION')");
+
+		tEnv().createTemporarySystemFunction("CustomScalarFunction", CustomScalarFunction.class);
+		tEnv().sqlUpdate(
+			"INSERT INTO SinkTable " +
+			"SELECT " +
+			"  i, " +
+			"  CustomScalarFunction(i), " +
+			"  CustomScalarFunction(CAST(NULL AS INT), 5, i, i) " +
+			"FROM SourceTable");
+		tEnv().execute("Test Job");
+
+		assertThat(TestCollectionTableFactory.getResult(), equalTo(sinkData));
+	}
+
+	@Test
+	public void testInvalidCustomScalarFunction() {
+		tEnv().sqlUpdate("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')");
+
+		tEnv().createTemporarySystemFunction("CustomScalarFunction", CustomScalarFunction.class);
+		try {
+			tEnv().sqlUpdate(
+				"INSERT INTO SinkTable " +
+				"SELECT CustomScalarFunction('test')");
+			fail();
+		} catch (CodeGenException e) {
+			assertThat(
+				e,
+				hasMessage(
+					equalTo(
+						"Could not find an implementation method that matches the following " +
+							"signature: java.lang.String eval(java.lang.String)")));
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Test functions
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Function that takes and returns primitives.
+	 */
+	public static class PrimitiveScalarFunction extends ScalarFunction {
+		public long eval(int i, long l, String s) {
+			return i + l + s.length();
+		}
+	}
+
+	/**
+	 * Function that is overloaded and takes use of annotations.
+	 */
+	public static class ComplexScalarFunction extends ScalarFunction {
+		public String eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o, java.sql.Timestamp t) {
+			return EncodingUtils.objectToString(o) + "+" + t.toString();
+		}
+
+		public @DataTypeHint("DECIMAL(5, 2)") BigDecimal eval() {
+			return new BigDecimal("123.4"); // 1 digit is missing
+		}
+
+		public String eval(byte[] bytes) {
+			return Arrays.toString(bytes);
+		}
+	}
+
+	/**
+	 * Function that has a custom type inference that is broader than the actual implementation.
+	 */
+	public static class CustomScalarFunction extends ScalarFunction {
+		public Integer eval(Integer... args) {
+			for (Integer o : args) {
+				if (o != null) {
+					return o;
+				}
+			}
+			return null;
+		}
+
+		@Override
+		public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+			return TypeInference.newBuilder()
+				.outputTypeStrategy(TypeStrategies.argument(0))
 
 Review comment:
   Input validation is rather optional. We are mostly interested in the return type which is why this is the only mandatory attribute.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#issuecomment-579773131
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit ef2e38562b3356eb4bf7c54d207367bb52e3e76e (Wed Jan 29 14:12:48 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] JingsongLi commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373382089
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -694,38 +706,67 @@ object CodeGenUtils {
     }
   }
 
+  /**
+   * Generates code for converting the given external source data type to the internal data format.
+   *
+   * Use this function for converting at the edges of the API.
+   */
   def genToInternalIfNeeded(
       ctx: CodeGeneratorContext,
-      t: DataType,
-      term: String): String = {
-    if (isInternalClass(t)) {
-      s"(${boxedTypeTermForType(fromDataTypeToLogicalType(t))}) $term"
+      sourceType: DataType,
+      externalTerm: String)
+    : GeneratedExpression = {
+    // convert external source type to internal format
+    val internalResultTerm = if (isInternalClass(sourceType)) {
+      s"(${boxedTypeTermForType(fromDataTypeToLogicalType(sourceType))}) $externalTerm"
 
 Review comment:
   We need this cast because UDF can return `Object` and DataType with internal conversion class...
   Sometimes there is some gap between the conversion class and real java function return class. It is safe to add this cast, and I think JVM can optimize this cast to not affect performance.
   
   I think we can add comment here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373540213
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##########
 @@ -694,38 +702,70 @@ object CodeGenUtils {
     }
   }
 
+  /**
+   * Generates code for converting the given external source data type to the internal data format.
+   *
+   * Use this function for converting at the edges of the API.
+   */
   def genToInternalIfNeeded(
       ctx: CodeGeneratorContext,
-      t: DataType,
-      term: String): String = {
-    if (isInternalClass(t)) {
-      s"(${boxedTypeTermForType(fromDataTypeToLogicalType(t))}) $term"
+      sourceDataType: DataType,
+      externalTerm: String)
+    : GeneratedExpression = {
+    val sourceType = sourceDataType.getLogicalType
+    val sourceClass = sourceDataType.getConversionClass
+    // convert external source type to internal format
+    val internalResultTerm = if (isInternalClass(sourceDataType)) {
+      s"$externalTerm"
     } else {
-      genToInternal(ctx, t, term)
+      genToInternal(ctx, sourceDataType, externalTerm)
+    }
+    // extract null term from result term
+    if (sourceClass.isPrimitive) {
+      generateNonNullField(sourceType, internalResultTerm)
+    } else {
+      generateInputFieldUnboxing(ctx, sourceType, externalTerm, internalResultTerm)
     }
   }
 
-  def genToExternal(ctx: CodeGeneratorContext, t: DataType, term: String): String = {
-    val iTerm = boxedTypeTermForType(fromDataTypeToLogicalType(t))
-    if (isConverterIdentity(t)) {
-      s"($iTerm) $term"
+  def genToExternal(
 
 Review comment:
   nit: For future reference. I feel that we can unify the `genToExternal/Internal` methods. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10960: [FLINK-15487][table] Update code generation for new type inference

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#issuecomment-579780094
 
 
   <!--
   Meta data
   Hash:ef2e38562b3356eb4bf7c54d207367bb52e3e76e Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/146600040 TriggerType:PUSH TriggerID:ef2e38562b3356eb4bf7c54d207367bb52e3e76e
   Hash:ef2e38562b3356eb4bf7c54d207367bb52e3e76e Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4656 TriggerType:PUSH TriggerID:ef2e38562b3356eb4bf7c54d207367bb52e3e76e
   -->
   ## CI report:
   
   * ef2e38562b3356eb4bf7c54d207367bb52e3e76e Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/146600040) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4656) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services