You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/26 09:46:30 UTC

[flink] branch master updated (a4936a5 -> 82e373c)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from a4936a5  [FLINK-13407][tests][coordination] Harden StandaloneResourceManagerTest
     new b16763f  [FLINK-13393][table-planner-blink] Fix generic TableSource doesn't work in blink planner
     new 82e373c  [FLINK-13391][table] Fix InputFormatTableSource#getDataStream should not call getReturnType.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/sources/InputFormatTableSource.java      |  7 +++-
 .../flink/table/planner/codegen/CodeGenUtils.scala | 14 ++++----
 .../table/planner/codegen/SinkCodeGenerator.scala  | 16 ---------
 .../codegen/calls/ScalarFunctionCallGen.scala      | 42 ++++++++++++++--------
 .../codegen/calls/TableFunctionCallGen.scala       | 31 ++--------------
 .../functions/utils/UserDefinedFunctionUtils.scala |  7 +++-
 .../nodes/physical/PhysicalTableSourceScan.scala   | 15 +++++++-
 .../batch/BatchExecBoundedStreamScan.scala         |  4 +--
 .../plan/nodes/physical/batch/BatchExecSink.scala  |  3 +-
 .../physical/batch/BatchExecTableSourceScan.scala  | 20 +----------
 .../physical/stream/StreamExecDataStreamScan.scala |  4 +--
 .../nodes/physical/stream/StreamExecSink.scala     |  5 ++-
 .../stream/StreamExecTableSourceScan.scala         | 20 +----------
 .../flink/table/planner/plan/utils/ScanUtil.scala  |  9 +++--
 .../runtime/batch/sql/TableSourceITCase.scala      | 24 ++++++++++---
 .../runtime/stream/sql/TableSourceITCase.scala     | 27 ++++++++++++--
 .../table/planner/utils/testTableSources.scala     | 23 ++++++++++++
 17 files changed, 143 insertions(+), 128 deletions(-)


[flink] 02/02: [FLINK-13391][table] Fix InputFormatTableSource#getDataStream should not call getReturnType.

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 82e373c6d12e957a6db5a956e17f39c6e8f6d51f
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jul 24 12:24:32 2019 +0800

    [FLINK-13391][table] Fix InputFormatTableSource#getDataStream should not call getReturnType.
---
 .../flink/table/sources/InputFormatTableSource.java       |  7 ++++++-
 .../plan/nodes/physical/PhysicalTableSourceScan.scala     | 15 ++++++++++++++-
 .../nodes/physical/batch/BatchExecTableSourceScan.scala   | 13 -------------
 .../nodes/physical/stream/StreamExecTableSourceScan.scala | 13 -------------
 .../flink/table/planner/utils/testTableSources.scala      |  3 ++-
 5 files changed, 22 insertions(+), 29 deletions(-)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/InputFormatTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/InputFormatTableSource.java
index 79fc55a..8547aa2 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/InputFormatTableSource.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/InputFormatTableSource.java
@@ -20,9 +20,12 @@ package org.apache.flink.table.sources;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
+
 /**
  * Defines an external bounded table and provides access to its data.
  *
@@ -44,8 +47,10 @@ public abstract class InputFormatTableSource<T> implements StreamTableSource<T>
 		return true;
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
 	public final DataStream<T> getDataStream(StreamExecutionEnvironment execEnv) {
-		return execEnv.createInput(getInputFormat(), getReturnType());
+		TypeInformation<T> typeInfo = (TypeInformation<T>) fromDataTypeToLegacyInfo(getProducedDataType());
+		return execEnv.createInput(getInputFormat(), typeInfo);
 	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/PhysicalTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/PhysicalTableSourceScan.scala
index c0bf64a..aa85e2c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/PhysicalTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/PhysicalTableSourceScan.scala
@@ -18,9 +18,11 @@
 
 package org.apache.flink.table.planner.plan.nodes.physical
 
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, TableSourceTable}
-import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.sources.{StreamTableSource, TableSource}
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelWriter
@@ -38,6 +40,9 @@ abstract class PhysicalTableSourceScan(
     relOptTable: FlinkRelOptTable)
   extends TableScan(cluster, traitSet, relOptTable) {
 
+  // cache table source transformation.
+  protected var sourceTransform: Transformation[_] = _
+
   protected val tableSourceTable: TableSourceTable[_] =
     relOptTable.unwrap(classOf[TableSourceTable[_]])
 
@@ -52,4 +57,12 @@ abstract class PhysicalTableSourceScan(
     super.explainTerms(pw).item("fields", getRowType.getFieldNames.asScala.mkString(", "))
   }
 
+  def getSourceTransformation(
+      streamEnv: StreamExecutionEnvironment): Transformation[_] = {
+    if (sourceTransform == null) {
+      sourceTransform = tableSource.asInstanceOf[StreamTableSource[_]].
+        getDataStream(streamEnv).getTransformation
+    }
+    sourceTransform
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
index 646320e..11a1dc5 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch
 
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.runtime.operators.DamBehavior
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext
@@ -54,9 +53,6 @@ class BatchExecTableSourceScan(
   with BatchPhysicalRel
   with BatchExecNode[BaseRow]{
 
-  // cache table source transformation.
-  private var sourceTransform: Transformation[_] = _
-
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
     new BatchExecTableSourceScan(cluster, traitSet, relOptTable)
   }
@@ -84,15 +80,6 @@ class BatchExecTableSourceScan(
     replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
   }
 
-  def getSourceTransformation(
-      streamEnv: StreamExecutionEnvironment): Transformation[_] = {
-    if (sourceTransform == null) {
-      sourceTransform = tableSource.asInstanceOf[StreamTableSource[_]].
-          getDataStream(streamEnv).getTransformation
-    }
-    sourceTransform
-  }
-
   override protected def translateToPlanInternal(
       planner: BatchPlanner): Transformation[BaseRow] = {
     val config = planner.getTableConfig
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
index 32f0fd6..38a3819 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.plan.nodes.physical.stream
 
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks}
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.table.api.{DataTypes, TableException}
@@ -61,9 +60,6 @@ class StreamExecTableSourceScan(
   with StreamPhysicalRel
   with StreamExecNode[BaseRow] {
 
-  // cache table source transformation.
-  private var sourceTransform: Transformation[_] = _
-
   override def producesUpdates: Boolean = false
 
   override def needsUpdatesAsRetraction(input: RelNode): Boolean = false
@@ -96,15 +92,6 @@ class StreamExecTableSourceScan(
     replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
   }
 
-  def getSourceTransformation(
-      streamEnv: StreamExecutionEnvironment): Transformation[_] = {
-    if (sourceTransform == null) {
-      sourceTransform = tableSource.asInstanceOf[StreamTableSource[_]].
-          getDataStream(streamEnv).getTransformation
-    }
-    sourceTransform
-  }
-
   override protected def translateToPlanInternal(
       planner: StreamPlanner): Transformation[BaseRow] = {
     val config = planner.getTableConfig
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
index e4cb20c..d2dcb1f 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
@@ -613,7 +613,8 @@ class TestInputFormatTableSource[T](
     new CollectionInputFormat[T](values.asJava, returnType.createSerializer(new ExecutionConfig))
   }
 
-  override def getReturnType: TypeInformation[T] = returnType
+  override def getReturnType: TypeInformation[T] =
+    throw new RuntimeException("Should not invoke this deprecated method.")
 
   override def getProducedDataType: DataType = fromLegacyInfoToDataType(returnType)
 


[flink] 01/02: [FLINK-13393][table-planner-blink] Fix generic TableSource doesn't work in blink planner

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b16763f8faf6b3da7db3a00e301cabd130252b84
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jul 24 11:08:15 2019 +0800

    [FLINK-13393][table-planner-blink] Fix generic TableSource doesn't work in blink planner
    
    The reason is that blink-planner use TypeExtractor to extract class from TableSource, and use this class to DataFormatConverter. However, blink should use conversion class of DataType in TableSource#getProducedDataType.
    
    This closes #9211
---
 .../flink/table/planner/codegen/CodeGenUtils.scala | 14 ++++----
 .../table/planner/codegen/SinkCodeGenerator.scala  | 16 ---------
 .../codegen/calls/ScalarFunctionCallGen.scala      | 42 ++++++++++++++--------
 .../codegen/calls/TableFunctionCallGen.scala       | 31 ++--------------
 .../functions/utils/UserDefinedFunctionUtils.scala |  7 +++-
 .../batch/BatchExecBoundedStreamScan.scala         |  4 +--
 .../plan/nodes/physical/batch/BatchExecSink.scala  |  3 +-
 .../physical/batch/BatchExecTableSourceScan.scala  |  7 +---
 .../physical/stream/StreamExecDataStreamScan.scala |  4 +--
 .../nodes/physical/stream/StreamExecSink.scala     |  5 ++-
 .../stream/StreamExecTableSourceScan.scala         |  7 +---
 .../flink/table/planner/plan/utils/ScanUtil.scala  |  9 +++--
 .../runtime/batch/sql/TableSourceITCase.scala      | 24 ++++++++++---
 .../runtime/stream/sql/TableSourceITCase.scala     | 27 ++++++++++++--
 .../table/planner/utils/testTableSources.scala     | 22 ++++++++++++
 15 files changed, 122 insertions(+), 100 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index 8da2b22..6f097f6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -201,10 +201,12 @@ object CodeGenUtils {
     * If it's internally compatible, don't need to DataStructure converter.
     * clazz != classOf[Row] => Row can only infer GenericType[Row].
     */
-  def isInternalClass(clazz: Class[_], t: DataType): Boolean =
+  def isInternalClass(t: DataType): Boolean = {
+    val clazz = t.getConversionClass
     clazz != classOf[Object] && clazz != classOf[Row] &&
-      (classOf[BaseRow].isAssignableFrom(clazz) ||
-          clazz == getInternalClassForType(fromDataTypeToLogicalType(t)))
+        (classOf[BaseRow].isAssignableFrom(clazz) ||
+            clazz == getInternalClassForType(fromDataTypeToLogicalType(t)))
+  }
 
   def hashCodeForType(
       ctx: CodeGeneratorContext, t: LogicalType, term: String): String = t.getTypeRoot match {
@@ -680,9 +682,8 @@ object CodeGenUtils {
   def genToInternalIfNeeded(
       ctx: CodeGeneratorContext,
       t: DataType,
-      clazz: Class[_],
       term: String): String = {
-    if (isInternalClass(clazz, t)) {
+    if (isInternalClass(t)) {
       s"(${boxedTypeTermForType(fromDataTypeToLogicalType(t))}) $term"
     } else {
       genToInternal(ctx, t, term)
@@ -705,9 +706,8 @@ object CodeGenUtils {
   def genToExternalIfNeeded(
       ctx: CodeGeneratorContext,
       t: DataType,
-      clazz: Class[_],
       term: String): String = {
-    if (isInternalClass(clazz, t)) {
+    if (isInternalClass(t)) {
       s"(${boxedTypeTermForType(fromDataTypeToLogicalType(t))}) $term"
     } else {
       genToExternal(ctx, t, term)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
index 51d3483..e972ad7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.planner.codegen
 
-import org.apache.flink.api.common.functions.InvalidTypesException
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
@@ -31,7 +30,6 @@ import org.apache.flink.table.dataformat.util.BaseRowUtil
 import org.apache.flink.table.dataformat.{BaseRow, GenericRow}
 import org.apache.flink.table.planner.codegen.CodeGenUtils.genToExternal
 import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect
-import org.apache.flink.table.planner.sinks.DataStreamTableSink
 import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
 import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
@@ -42,20 +40,6 @@ import org.apache.flink.types.Row
 
 object SinkCodeGenerator {
 
-  private[flink] def extractTableSinkTypeClass(sink: TableSink[_]): Class[_] = {
-    try {
-      sink match {
-        // DataStreamTableSink has no generic class, so we need get the type to get type class.
-        case sink: DataStreamTableSink[_] => sink.getConsumedDataType.getConversionClass
-        case _ => TypeExtractor.createTypeInfo(sink, classOf[TableSink[_]], sink.getClass, 0)
-                  .getTypeClass.asInstanceOf[Class[_]]
-      }
-    } catch {
-      case _: InvalidTypesException =>
-        classOf[Object]
-    }
-  }
-
   /** Code gen a operator to convert internal type rows to external type. **/
   def generateRowConverterOperator[OUT](
       ctx: CodeGeneratorContext,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala
index 35e13a7..f1deb87 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala
@@ -18,11 +18,13 @@
 
 package org.apache.flink.table.planner.codegen.calls
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.table.dataformat.DataFormatConverters
 import org.apache.flink.table.dataformat.DataFormatConverters.getConverterForDataType
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.planner.codegen.CodeGenUtils._
+import org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.prepareFunctionArgs
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GenerateUtils, GeneratedExpression}
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
@@ -74,7 +76,7 @@ class ScalarFunctionCallGen(scalarFunction: ScalarFunction) extends CallGenerato
         val javaTerm = newName("javaResult")
         // it maybe a Internal class, so use resultClass is most safety.
         val javaTypeTerm = resultClass.getCanonicalName
-        val internal = genToInternalIfNeeded(ctx, resultExternalType, resultClass, javaTerm)
+        val internal = genToInternalIfNeeded(ctx, resultExternalType, javaTerm)
         s"""
             |$javaTypeTerm $javaTerm = ($javaTypeTerm) $evalResult;
             |$resultTerm = $javaTerm == null ? null : ($internal);
@@ -106,29 +108,39 @@ class ScalarFunctionCallGen(scalarFunction: ScalarFunction) extends CallGenerato
       ctx: CodeGeneratorContext,
       operands: Seq[GeneratedExpression],
       func: ScalarFunction): Array[GeneratedExpression] = {
-
     // get the expanded parameter types
     var paramClasses = getEvalMethodSignature(func, operands.map(_.resultType).toArray)
+    prepareFunctionArgs(ctx, operands, paramClasses, func.getParameterTypes(paramClasses))
+  }
+
+}
+
+object ScalarFunctionCallGen {
 
-    val signatureTypes = func
-        .getParameterTypes(paramClasses)
-        .zipWithIndex
-        .map {
-          case (t, i) =>
-            // we don't trust GenericType.
-            if (t.isInstanceOf[GenericTypeInfo[_]]) {
-              fromLogicalTypeToDataType(operands(i).resultType)
-            } else {
-              fromLegacyInfoToDataType(t)
-            }
+  def prepareFunctionArgs(
+      ctx: CodeGeneratorContext,
+      operands: Seq[GeneratedExpression],
+      parameterClasses: Array[Class[_]],
+      parameterTypes: Array[TypeInformation[_]]): Array[GeneratedExpression] = {
+
+    val signatureTypes = parameterTypes.zipWithIndex.map {
+      case (t: GenericTypeInfo[_], i) =>
+        // we don't trust GenericType, like Row and BaseRow and LocalTime
+        val returnType = fromLogicalTypeToDataType(operands(i).resultType)
+        if (operands(i).resultType.supportsOutputConversion(t.getTypeClass)) {
+          returnType.bridgedTo(t.getTypeClass)
+        } else {
+          returnType
         }
+      case (t, _) => fromLegacyInfoToDataType(t)
+    }
 
-    paramClasses.zipWithIndex.zip(operands).map { case ((paramClass, i), operandExpr) =>
+    parameterClasses.zipWithIndex.zip(operands).map { case ((paramClass, i), operandExpr) =>
       if (paramClass.isPrimitive) {
         operandExpr
       } else {
         val externalResultTerm = genToExternalIfNeeded(
-          ctx, signatureTypes(i), paramClass, operandExpr.resultTerm)
+          ctx, signatureTypes(i), operandExpr.resultTerm)
         val exprOrNull = s"${operandExpr.nullTerm} ? null : ($externalResultTerm)"
         operandExpr.copy(resultTerm = exprOrNull)
       }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/TableFunctionCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/TableFunctionCallGen.scala
index ea7c074..b832d49 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/TableFunctionCallGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/TableFunctionCallGen.scala
@@ -18,15 +18,12 @@
 
 package org.apache.flink.table.planner.codegen.calls
 
-import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.table.functions.TableFunction
-import org.apache.flink.table.planner.codegen.CodeGenUtils.genToExternalIfNeeded
 import org.apache.flink.table.planner.codegen.GeneratedExpression.NEVER_NULL
+import org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.prepareFunctionArgs
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedExpression}
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.getEvalMethodSignature
-import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
 import org.apache.flink.table.types.logical.LogicalType
-import org.apache.flink.table.types.utils.TypeConversions
 
 /**
   * Generates a call to user-defined [[TableFunction]].
@@ -62,32 +59,8 @@ class TableFunctionCallGen(tableFunction: TableFunction[_]) extends CallGenerato
       ctx: CodeGeneratorContext,
       operands: Seq[GeneratedExpression],
       func: TableFunction[_]): Array[GeneratedExpression] = {
-
     // get the expanded parameter types
     var paramClasses = getEvalMethodSignature(func, operands.map(_.resultType).toArray)
-
-    val signatureTypes = func
-        .getParameterTypes(paramClasses)
-        .zipWithIndex
-        .map {
-          case (t, i) =>
-            // we don't trust GenericType.
-            if (t.isInstanceOf[GenericTypeInfo[_]]) {
-              fromLogicalTypeToDataType(operands(i).resultType)
-            } else {
-              TypeConversions.fromLegacyInfoToDataType(t)
-            }
-        }
-
-    paramClasses.zipWithIndex.zip(operands).map { case ((paramClass, i), operandExpr) =>
-      if (paramClass.isPrimitive) {
-        operandExpr
-      } else {
-        val externalResultTerm = genToExternalIfNeeded(
-          ctx, signatureTypes(i), paramClass, operandExpr.resultTerm)
-        val exprOrNull = s"${operandExpr.nullTerm} ? null : ($externalResultTerm)"
-        operandExpr.copy(resultTerm = exprOrNull)
-      }
-    }
+    prepareFunctionArgs(ctx, operands, paramClasses, func.getParameterTypes(paramClasses))
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
index fe1ed46..1de25dd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
@@ -171,7 +171,12 @@ object UserDefinedFunctionUtils {
       case (t: DataType, i) =>
         // we don't trust GenericType.
         if (fromDataTypeToLogicalType(t).getTypeRoot == LogicalTypeRoot.ANY) {
-          fromLogicalTypeToDataType(expectedTypes(i))
+          val returnType = fromLogicalTypeToDataType(expectedTypes(i))
+          if (expectedTypes(i).supportsOutputConversion(t.getConversionClass)) {
+            returnType.bridgedTo(t.getConversionClass)
+          } else {
+            returnType
+          }
         } else {
           t
         }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
index 3238390..e7de711 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
@@ -110,9 +110,7 @@ class BatchExecBoundedStreamScan(
 
   def needInternalConversion: Boolean = {
     ScanUtil.hasTimeAttributeField(boundedStreamTable.fieldIndexes) ||
-        ScanUtil.needsConversion(
-          boundedStreamTable.dataType,
-          boundedStreamTable.dataStream.getType.getTypeClass)
+        ScanUtil.needsConversion(boundedStreamTable.dataType)
   }
 
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
index 1e5c364..be8ac6f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
@@ -142,8 +142,7 @@ class BatchExecSink[T](
       // Sink's input must be BatchExecNode[BaseRow] now.
       case node: BatchExecNode[BaseRow] =>
         val plan = node.translateToPlan(planner)
-        val typeClass = extractTableSinkTypeClass(sink)
-        if (CodeGenUtils.isInternalClass(typeClass, resultDataType)) {
+        if (CodeGenUtils.isInternalClass(resultDataType)) {
           plan.asInstanceOf[Transformation[T]]
         } else {
           val (converterOperator, outputTypeInfo) = generateRowConverterOperator[T](
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
index f4b188c..646320e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.planner.plan.nodes.physical.batch
 
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.runtime.operators.DamBehavior
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.TableException
@@ -147,11 +146,7 @@ class BatchExecTableSourceScan(
       isStreamTable = false,
       tableSourceTable.selectedFields)
     ScanUtil.hasTimeAttributeField(fieldIndexes) ||
-      ScanUtil.needsConversion(
-        tableSource.getProducedDataType,
-        TypeExtractor.createTypeInfo(
-          tableSource, classOf[StreamTableSource[_]], tableSource.getClass, 0)
-          .getTypeClass.asInstanceOf[Class[_]])
+      ScanUtil.needsConversion(tableSource.getProducedDataType)
   }
 
   def getEstimatedRowCount: lang.Double = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala
index ca2ce5b..3306033 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala
@@ -116,9 +116,7 @@ class StreamExecDataStreamScan(
 
     // when there is row time extraction expression, we need internal conversion
     // when the physical type of the input date stream is not BaseRow, we need internal conversion.
-    if (rowtimeExpr.isDefined || ScanUtil.needsConversion(
-      dataStreamTable.dataType,
-      dataStreamTable.dataStream.getType.getTypeClass)) {
+    if (rowtimeExpr.isDefined || ScanUtil.needsConversion(dataStreamTable.dataType)) {
 
       // extract time if the index is -1 or -2.
       val (extractElement, resetElement) =
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
index 22a8ede..d505bf7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.table.api.{Table, TableException}
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.codegen.SinkCodeGenerator.{extractTableSinkTypeClass, generateRowConverterOperator}
+import org.apache.flink.table.planner.codegen.SinkCodeGenerator.generateRowConverterOperator
 import org.apache.flink.table.planner.codegen.{CodeGenUtils, CodeGeneratorContext}
 import org.apache.flink.table.planner.delegation.StreamPlanner
 import org.apache.flink.table.planner.plan.`trait`.{AccMode, AccModeTraitDef}
@@ -233,8 +233,7 @@ class StreamExecSink[T](
     }
     val resultDataType = sink.getConsumedDataType
     val resultType = fromDataTypeToLegacyInfo(resultDataType)
-    val typeClass = extractTableSinkTypeClass(sink)
-    if (CodeGenUtils.isInternalClass(typeClass, resultDataType)) {
+    if (CodeGenUtils.isInternalClass(resultDataType)) {
       parTransformation.asInstanceOf[Transformation[T]]
     } else {
       val (converterOperator, outputTypeInfo) = generateRowConverterOperator[T](
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
index bca83ce..32f0fd6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks}
@@ -198,11 +197,7 @@ class StreamExecTableSourceScan(
       isStreamTable = true,
       tableSourceTable.selectedFields)
     ScanUtil.hasTimeAttributeField(fieldIndexes) ||
-      ScanUtil.needsConversion(
-        tableSource.getProducedDataType,
-        TypeExtractor.createTypeInfo(
-          tableSource, classOf[StreamTableSource[_]], tableSource.getClass, 0)
-          .getTypeClass.asInstanceOf[Class[_]])
+      ScanUtil.needsConversion(tableSource.getProducedDataType)
   }
 }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
index 2beee54..7c1013a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
@@ -29,6 +29,7 @@ import org.apache.flink.table.planner.codegen.{CodeGenUtils, CodeGeneratorContex
 import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
 import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical.RowType
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
@@ -50,9 +51,13 @@ object ScanUtil {
         indexes.contains(TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER)||
         indexes.contains(TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER)
 
-  private[flink] def needsConversion(dataType: DataType, clz: Class[_]): Boolean =
+  private[flink] def needsConversion(source: TableSource[_]): Boolean = {
+    needsConversion(source.getProducedDataType)
+  }
+
+  private[flink] def needsConversion(dataType: DataType): Boolean =
     fromDataTypeToLogicalType(dataType) match {
-      case _: RowType => !CodeGenUtils.isInternalClass(clz, dataType)
+      case _: RowType => !CodeGenUtils.isInternalClass(dataType)
       case _ => true
     }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
index 60263e1..66ec0a1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
@@ -23,15 +23,14 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.{DataTypes, TableSchema, Types}
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
-import org.apache.flink.table.planner.utils.{TestFilterableTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
+import org.apache.flink.table.planner.utils.{TestFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter
 import org.apache.flink.types.Row
 
-import org.junit.{Before, Ignore, Test}
+import org.junit.{Before, Test}
 
 import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong}
 
-
 class TableSourceITCase extends BatchTestBase {
 
   @Before
@@ -40,7 +39,7 @@ class TableSourceITCase extends BatchTestBase {
     env.setParallelism(1) // set sink parallelism to 1
     val tableSchema = TableSchema.builder().fields(
       Array("a", "b", "c"),
-      Array(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.VARCHAR(Int.MaxValue))).build()
+      Array(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING())).build()
     tEnv.registerTableSource("MyTable", new TestProjectableTableSource(
       true,
       tableSchema,
@@ -205,4 +204,21 @@ class TableSourceITCase extends BatchTestBase {
       )
     )
   }
+
+  @Test
+  def testInputFormatSource(): Unit = {
+    val tableSchema = TableSchema.builder().fields(
+      Array("a", "b", "c"),
+      Array(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING())).build()
+    val tableSource = new TestInputFormatTableSource(
+      tableSchema, tableSchema.toRowType, TestData.smallData3)
+    tEnv.registerTableSource("MyInputFormatTable", tableSource)
+    checkResult(
+      "SELECT a, c FROM MyInputFormatTable",
+      Seq(
+        row(1, "Hi"),
+        row(2, "Hello"),
+        row(3, "Hello world"))
+    )
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
index 992d066..b1faaad 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
@@ -22,9 +22,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableSchema, Types}
-import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestingAppendSink}
-import org.apache.flink.table.planner.utils.{TestFilterableTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
+import org.apache.flink.table.api.{DataTypes, TableSchema, Types}
+import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestData, TestingAppendSink}
+import org.apache.flink.table.planner.utils.{TestFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
 import org.apache.flink.types.Row
 
 import org.junit.Assert._
@@ -381,4 +381,25 @@ class TableSourceITCase extends StreamingTestBase {
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
+  @Test
+  def testInputFormatSource(): Unit = {
+    val tableSchema = TableSchema.builder().fields(
+      Array("a", "b", "c"),
+      Array(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING())).build()
+    val tableSource = new TestInputFormatTableSource(
+      tableSchema, tableSchema.toRowType, TestData.smallData3)
+    tEnv.registerTableSource("MyInputFormatTable", tableSource)
+
+    val sink = new TestingAppendSink()
+    tEnv.sqlQuery("SELECT a, c FROM MyInputFormatTable").toAppendStream[Row].addSink(sink)
+
+    env.execute()
+
+    val expected = Seq(
+      "1,Hi",
+      "2,Hello",
+      "3,Hello world"
+    )
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
index 411b0aa..e4cb20c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
@@ -18,8 +18,12 @@
 
 package org.apache.flink.table.planner.utils
 
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.io.CollectionInputFormat
 import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.core.io.InputSplit
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.{TableSchema, Types}
@@ -32,6 +36,8 @@ import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.EventTimeSource
 import org.apache.flink.table.sources._
 import org.apache.flink.table.sources.tsextractors.ExistingField
 import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks}
+import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 import org.apache.flink.types.Row
 
 import java.io.{File, FileOutputStream, OutputStreamWriter}
@@ -597,3 +603,19 @@ class TestPartitionableTableSource(
 
   override def getTableSchema: TableSchema = new TableSchema(fieldNames, fieldTypes)
 }
+
+class TestInputFormatTableSource[T](
+    tableSchema: TableSchema,
+    returnType: TypeInformation[T],
+    values: Seq[T]) extends InputFormatTableSource[T] {
+
+  override def getInputFormat: InputFormat[T, _ <: InputSplit] = {
+    new CollectionInputFormat[T](values.asJava, returnType.createSerializer(new ExecutionConfig))
+  }
+
+  override def getReturnType: TypeInformation[T] = returnType
+
+  override def getProducedDataType: DataType = fromLegacyInfoToDataType(returnType)
+
+  override def getTableSchema: TableSchema = tableSchema
+}