You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Eric Xiao (Jira)" <ji...@apache.org> on 2022/11/03 14:58:00 UTC

[jira] [Updated] (FLINK-29837) SQL API does not expose the RowKind of the Row for processing Changelogs

     [ https://issues.apache.org/jira/browse/FLINK-29837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Eric Xiao updated FLINK-29837:
------------------------------
    Description: 
When working with `{{{}ChangeLog{}}}` data in the SQL API it was a bit misleading to see that the `{{{}op{}}}` column appears{^}[1]{^}  the type of  in the table schema of print results but it is not available to be used in a the SQL API:
{code:java}
val tableEnv = StreamTableEnvironment.create(env)

val dataStream = env.fromElements(
  Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)),
  Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)),
  Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100))
)(Types.ROW(Types.STRING, Types.INT))

// interpret the DataStream as a Table
val table =
  tableEnv.fromChangelogStream(dataStream, Schema.newBuilder().primaryKey("f0").build(), ChangelogMode.upsert())

// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table)
tableEnv
  .sqlQuery("SELECT * FROM InputTable where op = '+I'")
  .execute()
  .print() {code}
The error logs.

 

 
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 32 to line 1, column 33: Column 'op' not found in any table
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:184)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:109)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:237)
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:675){code}
It would be nice to expose the `op` column to be usable in the Flink SQL APIs as it is in the DataStream APIs.

[1] [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromchangelogstream] 

  was:
When working with `{{{}ChangeLog{}}}` data in the SQL API it was a bit misleading to see that the `{{{}op{}}}` column appears{^}[1]{^}  the type of  in the table schema of print results but it is not available to be used in a the SQL API:
{code:java}
val tableEnv = StreamTableEnvironment.create(env)

val dataStream = env.fromElements(
  Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)),
  Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)),
  Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100))
)(Types.ROW(Types.STRING, Types.INT))

// interpret the DataStream as a Table
val table =
  tableEnv.fromChangelogStream(dataStream, Schema.newBuilder().primaryKey("f0").build(), ChangelogMode.upsert())

// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table)
tableEnv
  .sqlQuery("SELECT * FROM InputTable where op = '+I'")
  .execute()
  .print() {code}
The error logs.

 

 
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 32 to line 1, column 33: Column 'op' not found in any table
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:184)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:109)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:237)
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:675)
    at com.shopify.trickle.pipelines.IteratorPipeline$.delayedEndpoint$com$shopify$trickle$pipelines$IteratorPipeline$1(IteratorPipeline.scala:32)
    at com.shopify.trickle.pipelines.IteratorPipeline$delayedInit$body.apply(IteratorPipeline.scala:11)
    at scala.Function0.apply$mcV$sp(Function0.scala:39)
    at scala.Function0.apply$mcV$sp$(Function0.scala:39)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
    at scala.App.$anonfun$main$1$adapted(App.scala:80)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at scala.App.main(App.scala:80)
    at scala.App.main$(App.scala:78)
    at com.shopify.trickle.pipelines.IteratorPipeline$.main(IteratorPipeline.scala:11)
    at com.shopify.trickle.pipelines.IteratorPipeline.main(IteratorPipeline.scala) {code}
It would be nice to expose the `op` column to be usable in the Flink SQL APIs as it is in the DataStream APIs.

[1] [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromchangelogstream] 


> SQL API does not expose the RowKind of the Row for processing Changelogs
> ------------------------------------------------------------------------
>
>                 Key: FLINK-29837
>                 URL: https://issues.apache.org/jira/browse/FLINK-29837
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.16.0
>            Reporter: Eric Xiao
>            Priority: Major
>
> When working with `{{{}ChangeLog{}}}` data in the SQL API it was a bit misleading to see that the `{{{}op{}}}` column appears{^}[1]{^}  the type of  in the table schema of print results but it is not available to be used in a the SQL API:
> {code:java}
> val tableEnv = StreamTableEnvironment.create(env)
> val dataStream = env.fromElements(
>   Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)),
>   Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)),
>   Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100))
> )(Types.ROW(Types.STRING, Types.INT))
> // interpret the DataStream as a Table
> val table =
>   tableEnv.fromChangelogStream(dataStream, Schema.newBuilder().primaryKey("f0").build(), ChangelogMode.upsert())
> // register the table under a name and perform an aggregation
> tableEnv.createTemporaryView("InputTable", table)
> tableEnv
>   .sqlQuery("SELECT * FROM InputTable where op = '+I'")
>   .execute()
>   .print() {code}
> The error logs.
>  
>  
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 32 to line 1, column 33: Column 'op' not found in any table
>     at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:184)
>     at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:109)
>     at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:237)
>     at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105)
>     at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:675){code}
> It would be nice to expose the `op` column to be usable in the Flink SQL APIs as it is in the DataStream APIs.
> [1] [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromchangelogstream] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)