You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/26 09:46:31 UTC

[flink] 01/02: [FLINK-13393][table-planner-blink] Fix generic TableSource doesn't work in blink planner

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b16763f8faf6b3da7db3a00e301cabd130252b84
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jul 24 11:08:15 2019 +0800

    [FLINK-13393][table-planner-blink] Fix generic TableSource doesn't work in blink planner
    
    The reason is that blink-planner use TypeExtractor to extract class from TableSource, and use this class to DataFormatConverter. However, blink should use conversion class of DataType in TableSource#getProducedDataType.
    
    This closes #9211
---
 .../flink/table/planner/codegen/CodeGenUtils.scala | 14 ++++----
 .../table/planner/codegen/SinkCodeGenerator.scala  | 16 ---------
 .../codegen/calls/ScalarFunctionCallGen.scala      | 42 ++++++++++++++--------
 .../codegen/calls/TableFunctionCallGen.scala       | 31 ++--------------
 .../functions/utils/UserDefinedFunctionUtils.scala |  7 +++-
 .../batch/BatchExecBoundedStreamScan.scala         |  4 +--
 .../plan/nodes/physical/batch/BatchExecSink.scala  |  3 +-
 .../physical/batch/BatchExecTableSourceScan.scala  |  7 +---
 .../physical/stream/StreamExecDataStreamScan.scala |  4 +--
 .../nodes/physical/stream/StreamExecSink.scala     |  5 ++-
 .../stream/StreamExecTableSourceScan.scala         |  7 +---
 .../flink/table/planner/plan/utils/ScanUtil.scala  |  9 +++--
 .../runtime/batch/sql/TableSourceITCase.scala      | 24 ++++++++++---
 .../runtime/stream/sql/TableSourceITCase.scala     | 27 ++++++++++++--
 .../table/planner/utils/testTableSources.scala     | 22 ++++++++++++
 15 files changed, 122 insertions(+), 100 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index 8da2b22..6f097f6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -201,10 +201,12 @@ object CodeGenUtils {
     * If it's internally compatible, don't need to DataStructure converter.
     * clazz != classOf[Row] => Row can only infer GenericType[Row].
     */
-  def isInternalClass(clazz: Class[_], t: DataType): Boolean =
+  def isInternalClass(t: DataType): Boolean = {
+    val clazz = t.getConversionClass
     clazz != classOf[Object] && clazz != classOf[Row] &&
-      (classOf[BaseRow].isAssignableFrom(clazz) ||
-          clazz == getInternalClassForType(fromDataTypeToLogicalType(t)))
+        (classOf[BaseRow].isAssignableFrom(clazz) ||
+            clazz == getInternalClassForType(fromDataTypeToLogicalType(t)))
+  }
 
   def hashCodeForType(
       ctx: CodeGeneratorContext, t: LogicalType, term: String): String = t.getTypeRoot match {
@@ -680,9 +682,8 @@ object CodeGenUtils {
   def genToInternalIfNeeded(
       ctx: CodeGeneratorContext,
       t: DataType,
-      clazz: Class[_],
       term: String): String = {
-    if (isInternalClass(clazz, t)) {
+    if (isInternalClass(t)) {
       s"(${boxedTypeTermForType(fromDataTypeToLogicalType(t))}) $term"
     } else {
       genToInternal(ctx, t, term)
@@ -705,9 +706,8 @@ object CodeGenUtils {
   def genToExternalIfNeeded(
       ctx: CodeGeneratorContext,
       t: DataType,
-      clazz: Class[_],
       term: String): String = {
-    if (isInternalClass(clazz, t)) {
+    if (isInternalClass(t)) {
       s"(${boxedTypeTermForType(fromDataTypeToLogicalType(t))}) $term"
     } else {
       genToExternal(ctx, t, term)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
index 51d3483..e972ad7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.planner.codegen
 
-import org.apache.flink.api.common.functions.InvalidTypesException
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
@@ -31,7 +30,6 @@ import org.apache.flink.table.dataformat.util.BaseRowUtil
 import org.apache.flink.table.dataformat.{BaseRow, GenericRow}
 import org.apache.flink.table.planner.codegen.CodeGenUtils.genToExternal
 import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect
-import org.apache.flink.table.planner.sinks.DataStreamTableSink
 import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
 import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
@@ -42,20 +40,6 @@ import org.apache.flink.types.Row
 
 object SinkCodeGenerator {
 
-  private[flink] def extractTableSinkTypeClass(sink: TableSink[_]): Class[_] = {
-    try {
-      sink match {
-        // DataStreamTableSink has no generic class, so we need get the type to get type class.
-        case sink: DataStreamTableSink[_] => sink.getConsumedDataType.getConversionClass
-        case _ => TypeExtractor.createTypeInfo(sink, classOf[TableSink[_]], sink.getClass, 0)
-                  .getTypeClass.asInstanceOf[Class[_]]
-      }
-    } catch {
-      case _: InvalidTypesException =>
-        classOf[Object]
-    }
-  }
-
   /** Code gen a operator to convert internal type rows to external type. **/
   def generateRowConverterOperator[OUT](
       ctx: CodeGeneratorContext,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala
index 35e13a7..f1deb87 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala
@@ -18,11 +18,13 @@
 
 package org.apache.flink.table.planner.codegen.calls
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.table.dataformat.DataFormatConverters
 import org.apache.flink.table.dataformat.DataFormatConverters.getConverterForDataType
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.planner.codegen.CodeGenUtils._
+import org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.prepareFunctionArgs
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GenerateUtils, GeneratedExpression}
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
@@ -74,7 +76,7 @@ class ScalarFunctionCallGen(scalarFunction: ScalarFunction) extends CallGenerato
         val javaTerm = newName("javaResult")
         // it maybe a Internal class, so use resultClass is most safety.
         val javaTypeTerm = resultClass.getCanonicalName
-        val internal = genToInternalIfNeeded(ctx, resultExternalType, resultClass, javaTerm)
+        val internal = genToInternalIfNeeded(ctx, resultExternalType, javaTerm)
         s"""
             |$javaTypeTerm $javaTerm = ($javaTypeTerm) $evalResult;
             |$resultTerm = $javaTerm == null ? null : ($internal);
@@ -106,29 +108,39 @@ class ScalarFunctionCallGen(scalarFunction: ScalarFunction) extends CallGenerato
       ctx: CodeGeneratorContext,
       operands: Seq[GeneratedExpression],
       func: ScalarFunction): Array[GeneratedExpression] = {
-
     // get the expanded parameter types
     var paramClasses = getEvalMethodSignature(func, operands.map(_.resultType).toArray)
+    prepareFunctionArgs(ctx, operands, paramClasses, func.getParameterTypes(paramClasses))
+  }
+
+}
+
+object ScalarFunctionCallGen {
 
-    val signatureTypes = func
-        .getParameterTypes(paramClasses)
-        .zipWithIndex
-        .map {
-          case (t, i) =>
-            // we don't trust GenericType.
-            if (t.isInstanceOf[GenericTypeInfo[_]]) {
-              fromLogicalTypeToDataType(operands(i).resultType)
-            } else {
-              fromLegacyInfoToDataType(t)
-            }
+  def prepareFunctionArgs(
+      ctx: CodeGeneratorContext,
+      operands: Seq[GeneratedExpression],
+      parameterClasses: Array[Class[_]],
+      parameterTypes: Array[TypeInformation[_]]): Array[GeneratedExpression] = {
+
+    val signatureTypes = parameterTypes.zipWithIndex.map {
+      case (t: GenericTypeInfo[_], i) =>
+        // we don't trust GenericType, like Row and BaseRow and LocalTime
+        val returnType = fromLogicalTypeToDataType(operands(i).resultType)
+        if (operands(i).resultType.supportsOutputConversion(t.getTypeClass)) {
+          returnType.bridgedTo(t.getTypeClass)
+        } else {
+          returnType
         }
+      case (t, _) => fromLegacyInfoToDataType(t)
+    }
 
-    paramClasses.zipWithIndex.zip(operands).map { case ((paramClass, i), operandExpr) =>
+    parameterClasses.zipWithIndex.zip(operands).map { case ((paramClass, i), operandExpr) =>
       if (paramClass.isPrimitive) {
         operandExpr
       } else {
         val externalResultTerm = genToExternalIfNeeded(
-          ctx, signatureTypes(i), paramClass, operandExpr.resultTerm)
+          ctx, signatureTypes(i), operandExpr.resultTerm)
         val exprOrNull = s"${operandExpr.nullTerm} ? null : ($externalResultTerm)"
         operandExpr.copy(resultTerm = exprOrNull)
       }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/TableFunctionCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/TableFunctionCallGen.scala
index ea7c074..b832d49 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/TableFunctionCallGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/TableFunctionCallGen.scala
@@ -18,15 +18,12 @@
 
 package org.apache.flink.table.planner.codegen.calls
 
-import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.table.functions.TableFunction
-import org.apache.flink.table.planner.codegen.CodeGenUtils.genToExternalIfNeeded
 import org.apache.flink.table.planner.codegen.GeneratedExpression.NEVER_NULL
+import org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.prepareFunctionArgs
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedExpression}
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.getEvalMethodSignature
-import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
 import org.apache.flink.table.types.logical.LogicalType
-import org.apache.flink.table.types.utils.TypeConversions
 
 /**
   * Generates a call to user-defined [[TableFunction]].
@@ -62,32 +59,8 @@ class TableFunctionCallGen(tableFunction: TableFunction[_]) extends CallGenerato
       ctx: CodeGeneratorContext,
       operands: Seq[GeneratedExpression],
       func: TableFunction[_]): Array[GeneratedExpression] = {
-
     // get the expanded parameter types
     var paramClasses = getEvalMethodSignature(func, operands.map(_.resultType).toArray)
-
-    val signatureTypes = func
-        .getParameterTypes(paramClasses)
-        .zipWithIndex
-        .map {
-          case (t, i) =>
-            // we don't trust GenericType.
-            if (t.isInstanceOf[GenericTypeInfo[_]]) {
-              fromLogicalTypeToDataType(operands(i).resultType)
-            } else {
-              TypeConversions.fromLegacyInfoToDataType(t)
-            }
-        }
-
-    paramClasses.zipWithIndex.zip(operands).map { case ((paramClass, i), operandExpr) =>
-      if (paramClass.isPrimitive) {
-        operandExpr
-      } else {
-        val externalResultTerm = genToExternalIfNeeded(
-          ctx, signatureTypes(i), paramClass, operandExpr.resultTerm)
-        val exprOrNull = s"${operandExpr.nullTerm} ? null : ($externalResultTerm)"
-        operandExpr.copy(resultTerm = exprOrNull)
-      }
-    }
+    prepareFunctionArgs(ctx, operands, paramClasses, func.getParameterTypes(paramClasses))
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
index fe1ed46..1de25dd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
@@ -171,7 +171,12 @@ object UserDefinedFunctionUtils {
       case (t: DataType, i) =>
         // we don't trust GenericType.
         if (fromDataTypeToLogicalType(t).getTypeRoot == LogicalTypeRoot.ANY) {
-          fromLogicalTypeToDataType(expectedTypes(i))
+          val returnType = fromLogicalTypeToDataType(expectedTypes(i))
+          if (expectedTypes(i).supportsOutputConversion(t.getConversionClass)) {
+            returnType.bridgedTo(t.getConversionClass)
+          } else {
+            returnType
+          }
         } else {
           t
         }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
index 3238390..e7de711 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
@@ -110,9 +110,7 @@ class BatchExecBoundedStreamScan(
 
   def needInternalConversion: Boolean = {
     ScanUtil.hasTimeAttributeField(boundedStreamTable.fieldIndexes) ||
-        ScanUtil.needsConversion(
-          boundedStreamTable.dataType,
-          boundedStreamTable.dataStream.getType.getTypeClass)
+        ScanUtil.needsConversion(boundedStreamTable.dataType)
   }
 
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
index 1e5c364..be8ac6f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
@@ -142,8 +142,7 @@ class BatchExecSink[T](
       // Sink's input must be BatchExecNode[BaseRow] now.
       case node: BatchExecNode[BaseRow] =>
         val plan = node.translateToPlan(planner)
-        val typeClass = extractTableSinkTypeClass(sink)
-        if (CodeGenUtils.isInternalClass(typeClass, resultDataType)) {
+        if (CodeGenUtils.isInternalClass(resultDataType)) {
           plan.asInstanceOf[Transformation[T]]
         } else {
           val (converterOperator, outputTypeInfo) = generateRowConverterOperator[T](
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
index f4b188c..646320e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.planner.plan.nodes.physical.batch
 
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.runtime.operators.DamBehavior
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.TableException
@@ -147,11 +146,7 @@ class BatchExecTableSourceScan(
       isStreamTable = false,
       tableSourceTable.selectedFields)
     ScanUtil.hasTimeAttributeField(fieldIndexes) ||
-      ScanUtil.needsConversion(
-        tableSource.getProducedDataType,
-        TypeExtractor.createTypeInfo(
-          tableSource, classOf[StreamTableSource[_]], tableSource.getClass, 0)
-          .getTypeClass.asInstanceOf[Class[_]])
+      ScanUtil.needsConversion(tableSource.getProducedDataType)
   }
 
   def getEstimatedRowCount: lang.Double = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala
index ca2ce5b..3306033 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala
@@ -116,9 +116,7 @@ class StreamExecDataStreamScan(
 
     // when there is row time extraction expression, we need internal conversion
     // when the physical type of the input date stream is not BaseRow, we need internal conversion.
-    if (rowtimeExpr.isDefined || ScanUtil.needsConversion(
-      dataStreamTable.dataType,
-      dataStreamTable.dataStream.getType.getTypeClass)) {
+    if (rowtimeExpr.isDefined || ScanUtil.needsConversion(dataStreamTable.dataType)) {
 
       // extract time if the index is -1 or -2.
       val (extractElement, resetElement) =
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
index 22a8ede..d505bf7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.table.api.{Table, TableException}
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.codegen.SinkCodeGenerator.{extractTableSinkTypeClass, generateRowConverterOperator}
+import org.apache.flink.table.planner.codegen.SinkCodeGenerator.generateRowConverterOperator
 import org.apache.flink.table.planner.codegen.{CodeGenUtils, CodeGeneratorContext}
 import org.apache.flink.table.planner.delegation.StreamPlanner
 import org.apache.flink.table.planner.plan.`trait`.{AccMode, AccModeTraitDef}
@@ -233,8 +233,7 @@ class StreamExecSink[T](
     }
     val resultDataType = sink.getConsumedDataType
     val resultType = fromDataTypeToLegacyInfo(resultDataType)
-    val typeClass = extractTableSinkTypeClass(sink)
-    if (CodeGenUtils.isInternalClass(typeClass, resultDataType)) {
+    if (CodeGenUtils.isInternalClass(resultDataType)) {
       parTransformation.asInstanceOf[Transformation[T]]
     } else {
       val (converterOperator, outputTypeInfo) = generateRowConverterOperator[T](
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
index bca83ce..32f0fd6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks}
@@ -198,11 +197,7 @@ class StreamExecTableSourceScan(
       isStreamTable = true,
       tableSourceTable.selectedFields)
     ScanUtil.hasTimeAttributeField(fieldIndexes) ||
-      ScanUtil.needsConversion(
-        tableSource.getProducedDataType,
-        TypeExtractor.createTypeInfo(
-          tableSource, classOf[StreamTableSource[_]], tableSource.getClass, 0)
-          .getTypeClass.asInstanceOf[Class[_]])
+      ScanUtil.needsConversion(tableSource.getProducedDataType)
   }
 }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
index 2beee54..7c1013a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
@@ -29,6 +29,7 @@ import org.apache.flink.table.planner.codegen.{CodeGenUtils, CodeGeneratorContex
 import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
 import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical.RowType
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
@@ -50,9 +51,13 @@ object ScanUtil {
         indexes.contains(TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER)||
         indexes.contains(TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER)
 
-  private[flink] def needsConversion(dataType: DataType, clz: Class[_]): Boolean =
+  private[flink] def needsConversion(source: TableSource[_]): Boolean = {
+    needsConversion(source.getProducedDataType)
+  }
+
+  private[flink] def needsConversion(dataType: DataType): Boolean =
     fromDataTypeToLogicalType(dataType) match {
-      case _: RowType => !CodeGenUtils.isInternalClass(clz, dataType)
+      case _: RowType => !CodeGenUtils.isInternalClass(dataType)
       case _ => true
     }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
index 60263e1..66ec0a1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
@@ -23,15 +23,14 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.{DataTypes, TableSchema, Types}
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
-import org.apache.flink.table.planner.utils.{TestFilterableTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
+import org.apache.flink.table.planner.utils.{TestFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter
 import org.apache.flink.types.Row
 
-import org.junit.{Before, Ignore, Test}
+import org.junit.{Before, Test}
 
 import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong}
 
-
 class TableSourceITCase extends BatchTestBase {
 
   @Before
@@ -40,7 +39,7 @@ class TableSourceITCase extends BatchTestBase {
     env.setParallelism(1) // set sink parallelism to 1
     val tableSchema = TableSchema.builder().fields(
       Array("a", "b", "c"),
-      Array(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.VARCHAR(Int.MaxValue))).build()
+      Array(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING())).build()
     tEnv.registerTableSource("MyTable", new TestProjectableTableSource(
       true,
       tableSchema,
@@ -205,4 +204,21 @@ class TableSourceITCase extends BatchTestBase {
       )
     )
   }
+
+  @Test
+  def testInputFormatSource(): Unit = {
+    val tableSchema = TableSchema.builder().fields(
+      Array("a", "b", "c"),
+      Array(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING())).build()
+    val tableSource = new TestInputFormatTableSource(
+      tableSchema, tableSchema.toRowType, TestData.smallData3)
+    tEnv.registerTableSource("MyInputFormatTable", tableSource)
+    checkResult(
+      "SELECT a, c FROM MyInputFormatTable",
+      Seq(
+        row(1, "Hi"),
+        row(2, "Hello"),
+        row(3, "Hello world"))
+    )
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
index 992d066..b1faaad 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
@@ -22,9 +22,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableSchema, Types}
-import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestingAppendSink}
-import org.apache.flink.table.planner.utils.{TestFilterableTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
+import org.apache.flink.table.api.{DataTypes, TableSchema, Types}
+import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestData, TestingAppendSink}
+import org.apache.flink.table.planner.utils.{TestFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
 import org.apache.flink.types.Row
 
 import org.junit.Assert._
@@ -381,4 +381,25 @@ class TableSourceITCase extends StreamingTestBase {
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
+  @Test
+  def testInputFormatSource(): Unit = {
+    val tableSchema = TableSchema.builder().fields(
+      Array("a", "b", "c"),
+      Array(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING())).build()
+    val tableSource = new TestInputFormatTableSource(
+      tableSchema, tableSchema.toRowType, TestData.smallData3)
+    tEnv.registerTableSource("MyInputFormatTable", tableSource)
+
+    val sink = new TestingAppendSink()
+    tEnv.sqlQuery("SELECT a, c FROM MyInputFormatTable").toAppendStream[Row].addSink(sink)
+
+    env.execute()
+
+    val expected = Seq(
+      "1,Hi",
+      "2,Hello",
+      "3,Hello world"
+    )
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
index 411b0aa..e4cb20c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
@@ -18,8 +18,12 @@
 
 package org.apache.flink.table.planner.utils
 
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.io.CollectionInputFormat
 import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.core.io.InputSplit
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.{TableSchema, Types}
@@ -32,6 +36,8 @@ import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.EventTimeSource
 import org.apache.flink.table.sources._
 import org.apache.flink.table.sources.tsextractors.ExistingField
 import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks}
+import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 import org.apache.flink.types.Row
 
 import java.io.{File, FileOutputStream, OutputStreamWriter}
@@ -597,3 +603,19 @@ class TestPartitionableTableSource(
 
   override def getTableSchema: TableSchema = new TableSchema(fieldNames, fieldTypes)
 }
+
+class TestInputFormatTableSource[T](
+    tableSchema: TableSchema,
+    returnType: TypeInformation[T],
+    values: Seq[T]) extends InputFormatTableSource[T] {
+
+  override def getInputFormat: InputFormat[T, _ <: InputSplit] = {
+    new CollectionInputFormat[T](values.asJava, returnType.createSerializer(new ExecutionConfig))
+  }
+
+  override def getReturnType: TypeInformation[T] = returnType
+
+  override def getProducedDataType: DataType = fromLegacyInfoToDataType(returnType)
+
+  override def getTableSchema: TableSchema = tableSchema
+}