You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/07 14:37:14 UTC

[flink] 02/05: [FLINK-13495][table-planner-blink] Deal with InputFormatTableSource in planner to use planner type convertion to keep precision

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ebf18db29b9b8480459e20b307e07980657e75e5
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Fri Aug 2 18:05:14 2019 +0200

    [FLINK-13495][table-planner-blink] Deal with InputFormatTableSource in planner to use planner type convertion to keep precision
---
 .../plan/nodes/physical/PhysicalTableSourceScan.scala  | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/PhysicalTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/PhysicalTableSourceScan.scala
index aa85e2c..50fa428 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/PhysicalTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/PhysicalTableSourceScan.scala
@@ -18,11 +18,15 @@
 
 package org.apache.flink.table.planner.plan.nodes.physical
 
+import org.apache.flink.api.common.io.InputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.dag.Transformation
+import org.apache.flink.core.io.InputSplit
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, TableSourceTable}
-import org.apache.flink.table.sources.{StreamTableSource, TableSource}
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
+import org.apache.flink.table.sources.{InputFormatTableSource, StreamTableSource, TableSource}
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelWriter
@@ -60,8 +64,16 @@ abstract class PhysicalTableSourceScan(
   def getSourceTransformation(
       streamEnv: StreamExecutionEnvironment): Transformation[_] = {
     if (sourceTransform == null) {
-      sourceTransform = tableSource.asInstanceOf[StreamTableSource[_]].
-        getDataStream(streamEnv).getTransformation
+      sourceTransform = tableSource match {
+        case format: InputFormatTableSource[_] =>
+          // we don't use InputFormatTableSource.getDataStream, because in here we use planner
+          // type conversion to support precision of Varchar and something else.
+          streamEnv.createInput(
+            format.getInputFormat.asInstanceOf[InputFormat[Any, _ <: InputSplit]],
+            fromDataTypeToTypeInfo(format.getProducedDataType).asInstanceOf[TypeInformation[Any]]
+          ).name(format.explainSource()).getTransformation
+        case s: StreamTableSource[_] => s.getDataStream(streamEnv).getTransformation
+      }
     }
     sourceTransform
   }