You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/06 12:42:52 UTC
[4/4] flink git commit: [FLINK-5251] [table] Decouple
StreamTableSourceScan from TableSourceTable.
[FLINK-5251] [table] Decouple StreamTableSourceScan from TableSourceTable.
This closes #2934.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98d18260
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98d18260
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98d18260
Branch: refs/heads/master
Commit: 98d1826030d1486a3d64466aff1c909a41e2de10
Parents: 6f9633c
Author: Kurt Young <yk...@gmail.com>
Authored: Mon Dec 5 09:43:13 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Dec 6 11:23:28 2016 +0100
----------------------------------------------------------------------
.../plan/nodes/datastream/DataStreamScan.scala | 6 ++++--
.../table/plan/nodes/datastream/StreamScan.scala | 5 +----
.../nodes/datastream/StreamTableSourceScan.scala | 19 ++++++++++---------
.../datastream/StreamTableSourceScanRule.scala | 6 +++++-
4 files changed, 20 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/98d18260/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
index 463e1bc..da83b64 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
@@ -35,11 +35,13 @@ class DataStreamScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
- rowType: RelDataType)
- extends StreamScan(cluster, traitSet, table, rowType) {
+ rowRelDataType: RelDataType)
+ extends StreamScan(cluster, traitSet, table) {
val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]])
+ override def deriveRowType() = rowRelDataType
+
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataStreamScan(
cluster,
http://git-wip-us.apache.org/repos/asf/flink/blob/98d18260/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
index 17620d0..b13770e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
@@ -37,13 +37,10 @@ import scala.collection.JavaConverters._
abstract class StreamScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
- table: RelOptTable,
- rowRelDataType: RelDataType)
+ table: RelOptTable)
extends TableScan(cluster, traitSet, table)
with DataStreamRel {
- override def deriveRowType() = rowRelDataType
-
protected def convertToExpectedType(
input: DataStream[Any],
flinkTable: FlinkTable[_],
http://git-wip-us.apache.org/repos/asf/flink/blob/98d18260/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 21b8a63..8201070 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -20,11 +20,10 @@ package org.apache.flink.api.table.plan.nodes.datastream
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.StreamTableEnvironment
import org.apache.flink.api.table.plan.schema.TableSourceTable
import org.apache.flink.api.table.sources.StreamTableSource
+import org.apache.flink.api.table.{FlinkTypeFactory, StreamTableEnvironment}
import org.apache.flink.streaming.api.datastream.DataStream
/** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
@@ -32,18 +31,20 @@ class StreamTableSourceScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
- rowType: RelDataType)
- extends StreamScan(cluster, traitSet, table, rowType) {
+ tableSource: StreamTableSource[_])
+ extends StreamScan(cluster, traitSet, table) {
- val tableSourceTable = table.unwrap(classOf[TableSourceTable])
- val tableSource = tableSourceTable.tableSource.asInstanceOf[StreamTableSource[_]]
+ override def deriveRowType() = {
+ val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
+ }
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new StreamTableSourceScan(
cluster,
traitSet,
- table,
- rowType
+ getTable,
+ tableSource
)
}
@@ -55,7 +56,7 @@ class StreamTableSourceScan(
val inputDataStream: DataStream[Any] = tableSource
.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
- convertToExpectedType(inputDataStream, tableSourceTable, expectedType, config)
+ convertToExpectedType(inputDataStream, new TableSourceTable(tableSource), expectedType, config)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/98d18260/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
index 9d8075c..91dd255 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
@@ -58,11 +58,15 @@ class StreamTableSourceScanRule
val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+ // The original registered table source
+ val table: TableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])
+ val tableSource: StreamTableSource[_] = table.tableSource.asInstanceOf[StreamTableSource[_]]
+
new StreamTableSourceScan(
rel.getCluster,
traitSet,
scan.getTable,
- rel.getRowType
+ tableSource
)
}
}