You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Eric Xiao (Jira)" <ji...@apache.org> on 2022/11/03 14:58:00 UTC
[jira] [Updated] (FLINK-29837) SQL API does not expose the RowKind of the Row for processing Changelogs
[ https://issues.apache.org/jira/browse/FLINK-29837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Eric Xiao updated FLINK-29837:
------------------------------
Description:
When working with `{{{}ChangeLog{}}}` data in the SQL API it was a bit misleading to see that the `{{{}op{}}}` column appears{^}[1]{^} the type of in the table schema of print results but it is not available to be used in a the SQL API:
{code:java}
val tableEnv = StreamTableEnvironment.create(env)
val dataStream = env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)),
Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100))
)(Types.ROW(Types.STRING, Types.INT))
// interpret the DataStream as a Table
val table =
tableEnv.fromChangelogStream(dataStream, Schema.newBuilder().primaryKey("f0").build(), ChangelogMode.upsert())
// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table)
tableEnv
.sqlQuery("SELECT * FROM InputTable where op = '+I'")
.execute()
.print() {code}
The error logs.
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 32 to line 1, column 33: Column 'op' not found in any table
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:184)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:109)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:237)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:675){code}
It would be nice to expose the `op` column to be usable in the Flink SQL APIs as it is in the DataStream APIs.
[1] [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromchangelogstream]
was:
When working with `{{{}ChangeLog{}}}` data in the SQL API it was a bit misleading to see that the `{{{}op{}}}` column appears{^}[1]{^} the type of in the table schema of print results but it is not available to be used in a the SQL API:
{code:java}
val tableEnv = StreamTableEnvironment.create(env)
val dataStream = env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)),
Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100))
)(Types.ROW(Types.STRING, Types.INT))
// interpret the DataStream as a Table
val table =
tableEnv.fromChangelogStream(dataStream, Schema.newBuilder().primaryKey("f0").build(), ChangelogMode.upsert())
// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table)
tableEnv
.sqlQuery("SELECT * FROM InputTable where op = '+I'")
.execute()
.print() {code}
The error logs.
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 32 to line 1, column 33: Column 'op' not found in any table
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:184)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:109)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:237)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:675)
at com.shopify.trickle.pipelines.IteratorPipeline$.delayedEndpoint$com$shopify$trickle$pipelines$IteratorPipeline$1(IteratorPipeline.scala:32)
at com.shopify.trickle.pipelines.IteratorPipeline$delayedInit$body.apply(IteratorPipeline.scala:11)
at scala.Function0.apply$mcV$sp(Function0.scala:39)
at scala.Function0.apply$mcV$sp$(Function0.scala:39)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
at scala.App.$anonfun$main$1$adapted(App.scala:80)
at scala.collection.immutable.List.foreach(List.scala:431)
at scala.App.main(App.scala:80)
at scala.App.main$(App.scala:78)
at com.shopify.trickle.pipelines.IteratorPipeline$.main(IteratorPipeline.scala:11)
at com.shopify.trickle.pipelines.IteratorPipeline.main(IteratorPipeline.scala) {code}
It would be nice to expose the `op` column to be usable in the Flink SQL APIs as it is in the DataStream APIs.
[1] [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromchangelogstream]
> SQL API does not expose the RowKind of the Row for processing Changelogs
> ------------------------------------------------------------------------
>
> Key: FLINK-29837
> URL: https://issues.apache.org/jira/browse/FLINK-29837
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.16.0
> Reporter: Eric Xiao
> Priority: Major
>
> When working with `{{{}ChangeLog{}}}` data in the SQL API it was a bit misleading to see that the `{{{}op{}}}` column appears{^}[1]{^} the type of in the table schema of print results but it is not available to be used in a the SQL API:
> {code:java}
> val tableEnv = StreamTableEnvironment.create(env)
> val dataStream = env.fromElements(
> Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)),
> Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)),
> Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100))
> )(Types.ROW(Types.STRING, Types.INT))
> // interpret the DataStream as a Table
> val table =
> tableEnv.fromChangelogStream(dataStream, Schema.newBuilder().primaryKey("f0").build(), ChangelogMode.upsert())
> // register the table under a name and perform an aggregation
> tableEnv.createTemporaryView("InputTable", table)
> tableEnv
> .sqlQuery("SELECT * FROM InputTable where op = '+I'")
> .execute()
> .print() {code}
> The error logs.
>
>
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 32 to line 1, column 33: Column 'op' not found in any table
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:184)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:109)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:237)
> at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:675){code}
> It would be nice to expose the `op` column to be usable in the Flink SQL APIs as it is in the DataStream APIs.
> [1] [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromchangelogstream]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)