You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/07 14:37:14 UTC
[flink] 02/05: [FLINK-13495][table-planner-blink] Deal with
InputFormatTableSource in planner to use planner type convertion to keep
precision
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ebf18db29b9b8480459e20b307e07980657e75e5
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Fri Aug 2 18:05:14 2019 +0200
[FLINK-13495][table-planner-blink] Deal with InputFormatTableSource in planner to use planner type convertion to keep precision
---
.../plan/nodes/physical/PhysicalTableSourceScan.scala | 18 +++++++++++++++---
1 file changed, 15 insertions(+), 3 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/PhysicalTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/PhysicalTableSourceScan.scala
index aa85e2c..50fa428 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/PhysicalTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/PhysicalTableSourceScan.scala
@@ -18,11 +18,15 @@
package org.apache.flink.table.planner.plan.nodes.physical
+import org.apache.flink.api.common.io.InputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.dag.Transformation
+import org.apache.flink.core.io.InputSplit
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, TableSourceTable}
-import org.apache.flink.table.sources.{StreamTableSource, TableSource}
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
+import org.apache.flink.table.sources.{InputFormatTableSource, StreamTableSource, TableSource}
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.RelWriter
@@ -60,8 +64,16 @@ abstract class PhysicalTableSourceScan(
def getSourceTransformation(
streamEnv: StreamExecutionEnvironment): Transformation[_] = {
if (sourceTransform == null) {
- sourceTransform = tableSource.asInstanceOf[StreamTableSource[_]].
- getDataStream(streamEnv).getTransformation
+ sourceTransform = tableSource match {
+ case format: InputFormatTableSource[_] =>
+ // we don't use InputFormatTableSource.getDataStream, because in here we use planner
+ // type conversion to support precision of Varchar and something else.
+ streamEnv.createInput(
+ format.getInputFormat.asInstanceOf[InputFormat[Any, _ <: InputSplit]],
+ fromDataTypeToTypeInfo(format.getProducedDataType).asInstanceOf[TypeInformation[Any]]
+ ).name(format.explainSource()).getTransformation
+ case s: StreamTableSource[_] => s.getDataStream(streamEnv).getTransformation
+ }
}
sourceTransform
}