You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/26 09:46:31 UTC
[flink] 01/02: [FLINK-13393][table-planner-blink] Fix generic
TableSource doesn't work in blink planner
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b16763f8faf6b3da7db3a00e301cabd130252b84
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jul 24 11:08:15 2019 +0800
[FLINK-13393][table-planner-blink] Fix generic TableSource doesn't work in blink planner
The reason is that blink-planner use TypeExtractor to extract class from TableSource, and use this class to DataFormatConverter. However, blink should use conversion class of DataType in TableSource#getProducedDataType.
This closes #9211
---
.../flink/table/planner/codegen/CodeGenUtils.scala | 14 ++++----
.../table/planner/codegen/SinkCodeGenerator.scala | 16 ---------
.../codegen/calls/ScalarFunctionCallGen.scala | 42 ++++++++++++++--------
.../codegen/calls/TableFunctionCallGen.scala | 31 ++--------------
.../functions/utils/UserDefinedFunctionUtils.scala | 7 +++-
.../batch/BatchExecBoundedStreamScan.scala | 4 +--
.../plan/nodes/physical/batch/BatchExecSink.scala | 3 +-
.../physical/batch/BatchExecTableSourceScan.scala | 7 +---
.../physical/stream/StreamExecDataStreamScan.scala | 4 +--
.../nodes/physical/stream/StreamExecSink.scala | 5 ++-
.../stream/StreamExecTableSourceScan.scala | 7 +---
.../flink/table/planner/plan/utils/ScanUtil.scala | 9 +++--
.../runtime/batch/sql/TableSourceITCase.scala | 24 ++++++++++---
.../runtime/stream/sql/TableSourceITCase.scala | 27 ++++++++++++--
.../table/planner/utils/testTableSources.scala | 22 ++++++++++++
15 files changed, 122 insertions(+), 100 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index 8da2b22..6f097f6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -201,10 +201,12 @@ object CodeGenUtils {
* If it's internally compatible, don't need to DataStructure converter.
* clazz != classOf[Row] => Row can only infer GenericType[Row].
*/
- def isInternalClass(clazz: Class[_], t: DataType): Boolean =
+ def isInternalClass(t: DataType): Boolean = {
+ val clazz = t.getConversionClass
clazz != classOf[Object] && clazz != classOf[Row] &&
- (classOf[BaseRow].isAssignableFrom(clazz) ||
- clazz == getInternalClassForType(fromDataTypeToLogicalType(t)))
+ (classOf[BaseRow].isAssignableFrom(clazz) ||
+ clazz == getInternalClassForType(fromDataTypeToLogicalType(t)))
+ }
def hashCodeForType(
ctx: CodeGeneratorContext, t: LogicalType, term: String): String = t.getTypeRoot match {
@@ -680,9 +682,8 @@ object CodeGenUtils {
def genToInternalIfNeeded(
ctx: CodeGeneratorContext,
t: DataType,
- clazz: Class[_],
term: String): String = {
- if (isInternalClass(clazz, t)) {
+ if (isInternalClass(t)) {
s"(${boxedTypeTermForType(fromDataTypeToLogicalType(t))}) $term"
} else {
genToInternal(ctx, t, term)
@@ -705,9 +706,8 @@ object CodeGenUtils {
def genToExternalIfNeeded(
ctx: CodeGeneratorContext,
t: DataType,
- clazz: Class[_],
term: String): String = {
- if (isInternalClass(clazz, t)) {
+ if (isInternalClass(t)) {
s"(${boxedTypeTermForType(fromDataTypeToLogicalType(t))}) $term"
} else {
genToExternal(ctx, t, term)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
index 51d3483..e972ad7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
@@ -18,7 +18,6 @@
package org.apache.flink.table.planner.codegen
-import org.apache.flink.api.common.functions.InvalidTypesException
import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
@@ -31,7 +30,6 @@ import org.apache.flink.table.dataformat.util.BaseRowUtil
import org.apache.flink.table.dataformat.{BaseRow, GenericRow}
import org.apache.flink.table.planner.codegen.CodeGenUtils.genToExternal
import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect
-import org.apache.flink.table.planner.sinks.DataStreamTableSink
import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
@@ -42,20 +40,6 @@ import org.apache.flink.types.Row
object SinkCodeGenerator {
- private[flink] def extractTableSinkTypeClass(sink: TableSink[_]): Class[_] = {
- try {
- sink match {
- // DataStreamTableSink has no generic class, so we need get the type to get type class.
- case sink: DataStreamTableSink[_] => sink.getConsumedDataType.getConversionClass
- case _ => TypeExtractor.createTypeInfo(sink, classOf[TableSink[_]], sink.getClass, 0)
- .getTypeClass.asInstanceOf[Class[_]]
- }
- } catch {
- case _: InvalidTypesException =>
- classOf[Object]
- }
- }
-
/** Code gen a operator to convert internal type rows to external type. **/
def generateRowConverterOperator[OUT](
ctx: CodeGeneratorContext,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala
index 35e13a7..f1deb87 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarFunctionCallGen.scala
@@ -18,11 +18,13 @@
package org.apache.flink.table.planner.codegen.calls
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.table.dataformat.DataFormatConverters
import org.apache.flink.table.dataformat.DataFormatConverters.getConverterForDataType
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.codegen.CodeGenUtils._
+import org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.prepareFunctionArgs
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GenerateUtils, GeneratedExpression}
import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
@@ -74,7 +76,7 @@ class ScalarFunctionCallGen(scalarFunction: ScalarFunction) extends CallGenerato
val javaTerm = newName("javaResult")
// it maybe a Internal class, so use resultClass is most safety.
val javaTypeTerm = resultClass.getCanonicalName
- val internal = genToInternalIfNeeded(ctx, resultExternalType, resultClass, javaTerm)
+ val internal = genToInternalIfNeeded(ctx, resultExternalType, javaTerm)
s"""
|$javaTypeTerm $javaTerm = ($javaTypeTerm) $evalResult;
|$resultTerm = $javaTerm == null ? null : ($internal);
@@ -106,29 +108,39 @@ class ScalarFunctionCallGen(scalarFunction: ScalarFunction) extends CallGenerato
ctx: CodeGeneratorContext,
operands: Seq[GeneratedExpression],
func: ScalarFunction): Array[GeneratedExpression] = {
-
// get the expanded parameter types
var paramClasses = getEvalMethodSignature(func, operands.map(_.resultType).toArray)
+ prepareFunctionArgs(ctx, operands, paramClasses, func.getParameterTypes(paramClasses))
+ }
+
+}
+
+object ScalarFunctionCallGen {
- val signatureTypes = func
- .getParameterTypes(paramClasses)
- .zipWithIndex
- .map {
- case (t, i) =>
- // we don't trust GenericType.
- if (t.isInstanceOf[GenericTypeInfo[_]]) {
- fromLogicalTypeToDataType(operands(i).resultType)
- } else {
- fromLegacyInfoToDataType(t)
- }
+ def prepareFunctionArgs(
+ ctx: CodeGeneratorContext,
+ operands: Seq[GeneratedExpression],
+ parameterClasses: Array[Class[_]],
+ parameterTypes: Array[TypeInformation[_]]): Array[GeneratedExpression] = {
+
+ val signatureTypes = parameterTypes.zipWithIndex.map {
+ case (t: GenericTypeInfo[_], i) =>
+ // we don't trust GenericType, like Row and BaseRow and LocalTime
+ val returnType = fromLogicalTypeToDataType(operands(i).resultType)
+ if (operands(i).resultType.supportsOutputConversion(t.getTypeClass)) {
+ returnType.bridgedTo(t.getTypeClass)
+ } else {
+ returnType
}
+ case (t, _) => fromLegacyInfoToDataType(t)
+ }
- paramClasses.zipWithIndex.zip(operands).map { case ((paramClass, i), operandExpr) =>
+ parameterClasses.zipWithIndex.zip(operands).map { case ((paramClass, i), operandExpr) =>
if (paramClass.isPrimitive) {
operandExpr
} else {
val externalResultTerm = genToExternalIfNeeded(
- ctx, signatureTypes(i), paramClass, operandExpr.resultTerm)
+ ctx, signatureTypes(i), operandExpr.resultTerm)
val exprOrNull = s"${operandExpr.nullTerm} ? null : ($externalResultTerm)"
operandExpr.copy(resultTerm = exprOrNull)
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/TableFunctionCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/TableFunctionCallGen.scala
index ea7c074..b832d49 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/TableFunctionCallGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/TableFunctionCallGen.scala
@@ -18,15 +18,12 @@
package org.apache.flink.table.planner.codegen.calls
-import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.table.functions.TableFunction
-import org.apache.flink.table.planner.codegen.CodeGenUtils.genToExternalIfNeeded
import org.apache.flink.table.planner.codegen.GeneratedExpression.NEVER_NULL
+import org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.prepareFunctionArgs
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedExpression}
import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.getEvalMethodSignature
-import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
import org.apache.flink.table.types.logical.LogicalType
-import org.apache.flink.table.types.utils.TypeConversions
/**
* Generates a call to user-defined [[TableFunction]].
@@ -62,32 +59,8 @@ class TableFunctionCallGen(tableFunction: TableFunction[_]) extends CallGenerato
ctx: CodeGeneratorContext,
operands: Seq[GeneratedExpression],
func: TableFunction[_]): Array[GeneratedExpression] = {
-
// get the expanded parameter types
var paramClasses = getEvalMethodSignature(func, operands.map(_.resultType).toArray)
-
- val signatureTypes = func
- .getParameterTypes(paramClasses)
- .zipWithIndex
- .map {
- case (t, i) =>
- // we don't trust GenericType.
- if (t.isInstanceOf[GenericTypeInfo[_]]) {
- fromLogicalTypeToDataType(operands(i).resultType)
- } else {
- TypeConversions.fromLegacyInfoToDataType(t)
- }
- }
-
- paramClasses.zipWithIndex.zip(operands).map { case ((paramClass, i), operandExpr) =>
- if (paramClass.isPrimitive) {
- operandExpr
- } else {
- val externalResultTerm = genToExternalIfNeeded(
- ctx, signatureTypes(i), paramClass, operandExpr.resultTerm)
- val exprOrNull = s"${operandExpr.nullTerm} ? null : ($externalResultTerm)"
- operandExpr.copy(resultTerm = exprOrNull)
- }
- }
+ prepareFunctionArgs(ctx, operands, paramClasses, func.getParameterTypes(paramClasses))
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
index fe1ed46..1de25dd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
@@ -171,7 +171,12 @@ object UserDefinedFunctionUtils {
case (t: DataType, i) =>
// we don't trust GenericType.
if (fromDataTypeToLogicalType(t).getTypeRoot == LogicalTypeRoot.ANY) {
- fromLogicalTypeToDataType(expectedTypes(i))
+ val returnType = fromLogicalTypeToDataType(expectedTypes(i))
+ if (expectedTypes(i).supportsOutputConversion(t.getConversionClass)) {
+ returnType.bridgedTo(t.getConversionClass)
+ } else {
+ returnType
+ }
} else {
t
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
index 3238390..e7de711 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecBoundedStreamScan.scala
@@ -110,9 +110,7 @@ class BatchExecBoundedStreamScan(
def needInternalConversion: Boolean = {
ScanUtil.hasTimeAttributeField(boundedStreamTable.fieldIndexes) ||
- ScanUtil.needsConversion(
- boundedStreamTable.dataType,
- boundedStreamTable.dataStream.getType.getTypeClass)
+ ScanUtil.needsConversion(boundedStreamTable.dataType)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
index 1e5c364..be8ac6f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSink.scala
@@ -142,8 +142,7 @@ class BatchExecSink[T](
// Sink's input must be BatchExecNode[BaseRow] now.
case node: BatchExecNode[BaseRow] =>
val plan = node.translateToPlan(planner)
- val typeClass = extractTableSinkTypeClass(sink)
- if (CodeGenUtils.isInternalClass(typeClass, resultDataType)) {
+ if (CodeGenUtils.isInternalClass(resultDataType)) {
plan.asInstanceOf[Transformation[T]]
} else {
val (converterOperator, outputTypeInfo) = generateRowConverterOperator[T](
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
index f4b188c..646320e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
@@ -19,7 +19,6 @@
package org.apache.flink.table.planner.plan.nodes.physical.batch
import org.apache.flink.api.dag.Transformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.runtime.operators.DamBehavior
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.TableException
@@ -147,11 +146,7 @@ class BatchExecTableSourceScan(
isStreamTable = false,
tableSourceTable.selectedFields)
ScanUtil.hasTimeAttributeField(fieldIndexes) ||
- ScanUtil.needsConversion(
- tableSource.getProducedDataType,
- TypeExtractor.createTypeInfo(
- tableSource, classOf[StreamTableSource[_]], tableSource.getClass, 0)
- .getTypeClass.asInstanceOf[Class[_]])
+ ScanUtil.needsConversion(tableSource.getProducedDataType)
}
def getEstimatedRowCount: lang.Double = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala
index ca2ce5b..3306033 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDataStreamScan.scala
@@ -116,9 +116,7 @@ class StreamExecDataStreamScan(
// when there is row time extraction expression, we need internal conversion
// when the physical type of the input date stream is not BaseRow, we need internal conversion.
- if (rowtimeExpr.isDefined || ScanUtil.needsConversion(
- dataStreamTable.dataType,
- dataStreamTable.dataStream.getType.getTypeClass)) {
+ if (rowtimeExpr.isDefined || ScanUtil.needsConversion(dataStreamTable.dataType)) {
// extract time if the index is -1 or -2.
val (extractElement, resetElement) =
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
index 22a8ede..d505bf7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.table.api.{Table, TableException}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.codegen.SinkCodeGenerator.{extractTableSinkTypeClass, generateRowConverterOperator}
+import org.apache.flink.table.planner.codegen.SinkCodeGenerator.generateRowConverterOperator
import org.apache.flink.table.planner.codegen.{CodeGenUtils, CodeGeneratorContext}
import org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.plan.`trait`.{AccMode, AccModeTraitDef}
@@ -233,8 +233,7 @@ class StreamExecSink[T](
}
val resultDataType = sink.getConsumedDataType
val resultType = fromDataTypeToLegacyInfo(resultDataType)
- val typeClass = extractTableSinkTypeClass(sink)
- if (CodeGenUtils.isInternalClass(typeClass, resultDataType)) {
+ if (CodeGenUtils.isInternalClass(resultDataType)) {
parTransformation.asInstanceOf[Transformation[T]]
} else {
val (converterOperator, outputTypeInfo) = generateRowConverterOperator[T](
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
index bca83ce..32f0fd6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
@@ -19,7 +19,6 @@
package org.apache.flink.table.planner.plan.nodes.physical.stream
import org.apache.flink.api.dag.Transformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks}
@@ -198,11 +197,7 @@ class StreamExecTableSourceScan(
isStreamTable = true,
tableSourceTable.selectedFields)
ScanUtil.hasTimeAttributeField(fieldIndexes) ||
- ScanUtil.needsConversion(
- tableSource.getProducedDataType,
- TypeExtractor.createTypeInfo(
- tableSource, classOf[StreamTableSource[_]], tableSource.getClass, 0)
- .getTypeClass.asInstanceOf[Class[_]])
+ ScanUtil.needsConversion(tableSource.getProducedDataType)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
index 2beee54..7c1013a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
@@ -29,6 +29,7 @@ import org.apache.flink.table.planner.codegen.{CodeGenUtils, CodeGeneratorContex
import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.types.DataType
import org.apache.flink.table.types.logical.RowType
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
@@ -50,9 +51,13 @@ object ScanUtil {
indexes.contains(TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER)||
indexes.contains(TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER)
- private[flink] def needsConversion(dataType: DataType, clz: Class[_]): Boolean =
+ private[flink] def needsConversion(source: TableSource[_]): Boolean = {
+ needsConversion(source.getProducedDataType)
+ }
+
+ private[flink] def needsConversion(dataType: DataType): Boolean =
fromDataTypeToLogicalType(dataType) match {
- case _: RowType => !CodeGenUtils.isInternalClass(clz, dataType)
+ case _: RowType => !CodeGenUtils.isInternalClass(dataType)
case _ => true
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
index 60263e1..66ec0a1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
@@ -23,15 +23,14 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.{DataTypes, TableSchema, Types}
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}
-import org.apache.flink.table.planner.utils.{TestFilterableTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
+import org.apache.flink.table.planner.utils.{TestFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter
import org.apache.flink.types.Row
-import org.junit.{Before, Ignore, Test}
+import org.junit.{Before, Test}
import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong}
-
class TableSourceITCase extends BatchTestBase {
@Before
@@ -40,7 +39,7 @@ class TableSourceITCase extends BatchTestBase {
env.setParallelism(1) // set sink parallelism to 1
val tableSchema = TableSchema.builder().fields(
Array("a", "b", "c"),
- Array(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.VARCHAR(Int.MaxValue))).build()
+ Array(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING())).build()
tEnv.registerTableSource("MyTable", new TestProjectableTableSource(
true,
tableSchema,
@@ -205,4 +204,21 @@ class TableSourceITCase extends BatchTestBase {
)
)
}
+
+ @Test
+ def testInputFormatSource(): Unit = {
+ val tableSchema = TableSchema.builder().fields(
+ Array("a", "b", "c"),
+ Array(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING())).build()
+ val tableSource = new TestInputFormatTableSource(
+ tableSchema, tableSchema.toRowType, TestData.smallData3)
+ tEnv.registerTableSource("MyInputFormatTable", tableSource)
+ checkResult(
+ "SELECT a, c FROM MyInputFormatTable",
+ Seq(
+ row(1, "Hi"),
+ row(2, "Hello"),
+ row(3, "Hello world"))
+ )
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
index 992d066..b1faaad 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
@@ -22,9 +22,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableSchema, Types}
-import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestingAppendSink}
-import org.apache.flink.table.planner.utils.{TestFilterableTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
+import org.apache.flink.table.api.{DataTypes, TableSchema, Types}
+import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestData, TestingAppendSink}
+import org.apache.flink.table.planner.utils.{TestFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableTableSource, TestProjectableTableSource, TestTableSources}
import org.apache.flink.types.Row
import org.junit.Assert._
@@ -381,4 +381,25 @@ class TableSourceITCase extends StreamingTestBase {
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
+ @Test
+ def testInputFormatSource(): Unit = {
+ val tableSchema = TableSchema.builder().fields(
+ Array("a", "b", "c"),
+ Array(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING())).build()
+ val tableSource = new TestInputFormatTableSource(
+ tableSchema, tableSchema.toRowType, TestData.smallData3)
+ tEnv.registerTableSource("MyInputFormatTable", tableSource)
+
+ val sink = new TestingAppendSink()
+ tEnv.sqlQuery("SELECT a, c FROM MyInputFormatTable").toAppendStream[Row].addSink(sink)
+
+ env.execute()
+
+ val expected = Seq(
+ "1,Hi",
+ "2,Hello",
+ "3,Hello world"
+ )
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
index 411b0aa..e4cb20c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
@@ -18,8 +18,12 @@
package org.apache.flink.table.planner.utils
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.io.CollectionInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.core.io.InputSplit
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.{TableSchema, Types}
@@ -32,6 +36,8 @@ import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.EventTimeSource
import org.apache.flink.table.sources._
import org.apache.flink.table.sources.tsextractors.ExistingField
import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks}
+import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
import org.apache.flink.types.Row
import java.io.{File, FileOutputStream, OutputStreamWriter}
@@ -597,3 +603,19 @@ class TestPartitionableTableSource(
override def getTableSchema: TableSchema = new TableSchema(fieldNames, fieldTypes)
}
+
+class TestInputFormatTableSource[T](
+ tableSchema: TableSchema,
+ returnType: TypeInformation[T],
+ values: Seq[T]) extends InputFormatTableSource[T] {
+
+ override def getInputFormat: InputFormat[T, _ <: InputSplit] = {
+ new CollectionInputFormat[T](values.asJava, returnType.createSerializer(new ExecutionConfig))
+ }
+
+ override def getReturnType: TypeInformation[T] = returnType
+
+ override def getProducedDataType: DataType = fromLegacyInfoToDataType(returnType)
+
+ override def getTableSchema: TableSchema = tableSchema
+}