You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/06 12:42:52 UTC

[4/4] flink git commit: [FLINK-5251] [table] Decouple StreamTableSourceScan from TableSourceTable.

[FLINK-5251] [table] Decouple StreamTableSourceScan from TableSourceTable.

This closes #2934.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98d18260
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98d18260
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98d18260

Branch: refs/heads/master
Commit: 98d1826030d1486a3d64466aff1c909a41e2de10
Parents: 6f9633c
Author: Kurt Young <yk...@gmail.com>
Authored: Mon Dec 5 09:43:13 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Dec 6 11:23:28 2016 +0100

----------------------------------------------------------------------
 .../plan/nodes/datastream/DataStreamScan.scala   |  6 ++++--
 .../table/plan/nodes/datastream/StreamScan.scala |  5 +----
 .../nodes/datastream/StreamTableSourceScan.scala | 19 ++++++++++---------
 .../datastream/StreamTableSourceScanRule.scala   |  6 +++++-
 4 files changed, 20 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/98d18260/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
index 463e1bc..da83b64 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
@@ -35,11 +35,13 @@ class DataStreamScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    rowType: RelDataType)
-  extends StreamScan(cluster, traitSet, table, rowType) {
+    rowRelDataType: RelDataType)
+  extends StreamScan(cluster, traitSet, table) {
 
   val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]])
 
+  override def deriveRowType() = rowRelDataType
+
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamScan(
       cluster,

http://git-wip-us.apache.org/repos/asf/flink/blob/98d18260/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
index 17620d0..b13770e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
@@ -37,13 +37,10 @@ import scala.collection.JavaConverters._
 abstract class StreamScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
-    table: RelOptTable,
-    rowRelDataType: RelDataType)
+    table: RelOptTable)
   extends TableScan(cluster, traitSet, table)
   with DataStreamRel {
 
-  override def deriveRowType() = rowRelDataType
-
   protected def convertToExpectedType(
       input: DataStream[Any],
       flinkTable: FlinkTable[_],

http://git-wip-us.apache.org/repos/asf/flink/blob/98d18260/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 21b8a63..8201070 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -20,11 +20,10 @@ package org.apache.flink.api.table.plan.nodes.datastream
 
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.StreamTableEnvironment
 import org.apache.flink.api.table.plan.schema.TableSourceTable
 import org.apache.flink.api.table.sources.StreamTableSource
+import org.apache.flink.api.table.{FlinkTypeFactory, StreamTableEnvironment}
 import org.apache.flink.streaming.api.datastream.DataStream
 
 /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
@@ -32,18 +31,20 @@ class StreamTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    rowType: RelDataType)
-  extends StreamScan(cluster, traitSet, table, rowType) {
+    tableSource: StreamTableSource[_])
+  extends StreamScan(cluster, traitSet, table) {
 
-  val tableSourceTable = table.unwrap(classOf[TableSourceTable])
-  val tableSource = tableSourceTable.tableSource.asInstanceOf[StreamTableSource[_]]
+  override def deriveRowType() = {
+    val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
+  }
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new StreamTableSourceScan(
       cluster,
       traitSet,
-      table,
-      rowType
+      getTable,
+      tableSource
     )
   }
 
@@ -55,7 +56,7 @@ class StreamTableSourceScan(
     val inputDataStream: DataStream[Any] = tableSource
       .getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
 
-    convertToExpectedType(inputDataStream, tableSourceTable, expectedType, config)
+    convertToExpectedType(inputDataStream, new TableSourceTable(tableSource), expectedType, config)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/98d18260/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
index 9d8075c..91dd255 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
@@ -58,11 +58,15 @@ class StreamTableSourceScanRule
     val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
     val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
 
+    // The original registered table source
+    val table: TableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])
+    val tableSource: StreamTableSource[_] = table.tableSource.asInstanceOf[StreamTableSource[_]]
+
     new StreamTableSourceScan(
       rel.getCluster,
       traitSet,
       scan.getTable,
-      rel.getRowType
+      tableSource
     )
   }
 }