You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fhueske <gi...@git.apache.org> on 2017/09/01 20:12:52 UTC

[GitHub] flink pull request #4635: [FLINK-7571] [table] Fix translation of TableSourc...

GitHub user fhueske opened a pull request:

    https://github.com/apache/flink/pull/4635

    [FLINK-7571] [table] Fix translation of TableSource with time indicators

    ## What is the purpose of the change
    
    This PR fixes the translation / code generation for table scans against table sources that include tiem indicator fields. The behavior was broken during the recent time indicator refactoring.
    I added a test to ensure the feature remains working.
    
    ## Brief change log
    
    - Add an ITCase to ensure that table sources with time indicators are correctly executed
    - Fix the registration of table sources with time indicators
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    Run `TimeAttributesITCase.testTableSourceWithTimeIndicators()`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): **no**
      - The public API, i.e., is any changed class annotated with **no**
      - The serializers: **no**
      - The runtime per-record code paths (performance sensitive): **no**
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
    
    ## Documentation
    
      - Does this pull request introduce a new feature? **no**
      - If yes, how is the feature documented? **n/a**
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fhueske/flink tableTSFix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4635.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4635
    
----
commit c7f6d069c8224ee756bcbd66305ad84d2a7ac8d3
Author: Fabian Hueske <fh...@apache.org>
Date:   2017-09-01T19:41:21Z

    [FLINK-7571] [table] Fix translation of TableSource with time indicators.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4635: [FLINK-7571] [table] Fix translation of TableSourc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4635#discussion_r139633811
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala ---
    @@ -28,48 +29,113 @@ import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
     class StreamTableSourceTable[T](
         override val tableSource: TableSource[T],
         override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
    -  extends TableSourceTable[T](tableSource, statistic) {
    -
    +  extends TableSourceTable[T](
    +    tableSource,
    +    StreamTableSourceTable.adjustFieldIndexes(tableSource),
    +    StreamTableSourceTable.adjustFieldNames(tableSource),
    +    statistic) {
     
       override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
    +    val fieldTypes = StreamTableSourceTable.adjustFieldTypes(tableSource)
    +
         val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
    +    flinkTypeFactory.buildLogicalRowType(
    +      this.fieldNames,
    +      fieldTypes)
    +  }
     
    -    val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
    -    val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
    +}
     
    -    val fields = fieldNames.zip(fieldTypes)
    +object StreamTableSourceTable {
    +
    +  private def adjustFieldIndexes(tableSource: TableSource[_]): Array[Int] = {
    +    val (rowtime, proctime) = getTimeIndicators(tableSource)
    +
    +    val original = TableEnvironment.getFieldIndices(tableSource)
    +
    +    // append rowtime marker
    +    val withRowtime = if (rowtime.isDefined) {
    +      original :+ TimeIndicatorTypeInfo.ROWTIME_MARKER
    --- End diff --
    
    That's how rowtime fields are handled at the moment. They are always appended at the end of the row.
    
    There is a JIRA to use existing fields as rowtime fields (https://issues.apache.org/jira/browse/FLINK-7446) but this has not been implemented yet. I'm currently working on a PR for that.


---

[GitHub] flink pull request #4635: [FLINK-7571] [table] Fix translation of TableSourc...

Posted by uybhatti <gi...@git.apache.org>.
Github user uybhatti commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4635#discussion_r139630218
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala ---
    @@ -28,48 +29,113 @@ import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
     class StreamTableSourceTable[T](
         override val tableSource: TableSource[T],
         override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
    -  extends TableSourceTable[T](tableSource, statistic) {
    -
    +  extends TableSourceTable[T](
    +    tableSource,
    +    StreamTableSourceTable.adjustFieldIndexes(tableSource),
    +    StreamTableSourceTable.adjustFieldNames(tableSource),
    +    statistic) {
     
       override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
    +    val fieldTypes = StreamTableSourceTable.adjustFieldTypes(tableSource)
    +
         val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
    +    flinkTypeFactory.buildLogicalRowType(
    +      this.fieldNames,
    +      fieldTypes)
    +  }
     
    -    val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
    -    val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
    +}
     
    -    val fields = fieldNames.zip(fieldTypes)
    +object StreamTableSourceTable {
    +
    +  private def adjustFieldIndexes(tableSource: TableSource[_]): Array[Int] = {
    +    val (rowtime, proctime) = getTimeIndicators(tableSource)
    +
    +    val original = TableEnvironment.getFieldIndices(tableSource)
    +
    +    // append rowtime marker
    +    val withRowtime = if (rowtime.isDefined) {
    +      original :+ TimeIndicatorTypeInfo.ROWTIME_MARKER
    --- End diff --
    
    For rowtime we need to replace the index of specific field, but here you are adding index at the end of row. Same thing applies for FieldNames and FieldTypes.


---

[GitHub] flink pull request #4635: [FLINK-7571] [table] Fix translation of TableSourc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4635


---

[GitHub] flink issue #4635: [FLINK-7571] [table] Fix translation of TableSource with ...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4635
  
    Hi @fhueske , thanks for the work! It is a very important fix! I'm fine with the changes.
    
    +1 to merge


---

[GitHub] flink issue #4635: [FLINK-7571] [table] Fix translation of TableSource with ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4635
  
    @wuchong can you have a look at this PR? 
    It fixes the code generation for table sources with time attributes. 
    Thanks!


---

[GitHub] flink pull request #4635: [FLINK-7571] [table] Fix translation of TableSourc...

Posted by uybhatti <gi...@git.apache.org>.
Github user uybhatti commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4635#discussion_r139661930
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala ---
    @@ -28,48 +29,113 @@ import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
     class StreamTableSourceTable[T](
         override val tableSource: TableSource[T],
         override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
    -  extends TableSourceTable[T](tableSource, statistic) {
    -
    +  extends TableSourceTable[T](
    +    tableSource,
    +    StreamTableSourceTable.adjustFieldIndexes(tableSource),
    +    StreamTableSourceTable.adjustFieldNames(tableSource),
    +    statistic) {
     
       override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
    +    val fieldTypes = StreamTableSourceTable.adjustFieldTypes(tableSource)
    +
         val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
    +    flinkTypeFactory.buildLogicalRowType(
    +      this.fieldNames,
    +      fieldTypes)
    +  }
     
    -    val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
    -    val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
    +}
     
    -    val fields = fieldNames.zip(fieldTypes)
    +object StreamTableSourceTable {
    +
    +  private def adjustFieldIndexes(tableSource: TableSource[_]): Array[Int] = {
    +    val (rowtime, proctime) = getTimeIndicators(tableSource)
    +
    +    val original = TableEnvironment.getFieldIndices(tableSource)
    +
    +    // append rowtime marker
    +    val withRowtime = if (rowtime.isDefined) {
    +      original :+ TimeIndicatorTypeInfo.ROWTIME_MARKER
    --- End diff --
    
    Thanks, Actually we can define existing field as the rowtime field, when we convert DataStream to Table. That's why I was little bit confused, but we have PR for TableSource then it's good.


---

[GitHub] flink issue #4635: [FLINK-7571] [table] Fix translation of TableSource with ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4635
  
    Thanks for the review @wuchong!
    
    Merging


---