You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Eric Xiao (Jira)" <ji...@apache.org> on 2022/11/01 23:59:00 UTC

[jira] [Created] (FLINK-29837) SQL API does not expose the RowKind of the Row for processing Changelogs

Eric Xiao created FLINK-29837:
---------------------------------

             Summary: SQL API does not expose the RowKind of the Row for processing Changelogs
                 Key: FLINK-29837
                 URL: https://issues.apache.org/jira/browse/FLINK-29837
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.16.0
            Reporter: Eric Xiao


When working with `{{{}ChangeLog{}}}` data in the SQL API it was a bit misleading to see that the `{{{}op{}}}` column appears{^}[1]{^}  the type of  in the table schema of print results but it is not available to be used in a the SQL API:
{code:java}
val tableEnv = StreamTableEnvironment.create(env)

val dataStream = env.fromElements(
  Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)),
  Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)),
  Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100))
)(Types.ROW(Types.STRING, Types.INT))

// interpret the DataStream as a Table
val table =
  tableEnv.fromChangelogStream(dataStream, Schema.newBuilder().primaryKey("f0").build(), ChangelogMode.upsert())

// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table)
tableEnv
  .sqlQuery("SELECT * FROM InputTable where op = '+I'")
  .execute()
  .print() {code}
The error logs.

 

 
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 32 to line 1, column 33: Column 'op' not found in any table
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:184)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:109)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:237)
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:675)
    at com.shopify.trickle.pipelines.IteratorPipeline$.delayedEndpoint$com$shopify$trickle$pipelines$IteratorPipeline$1(IteratorPipeline.scala:32)
    at com.shopify.trickle.pipelines.IteratorPipeline$delayedInit$body.apply(IteratorPipeline.scala:11)
    at scala.Function0.apply$mcV$sp(Function0.scala:39)
    at scala.Function0.apply$mcV$sp$(Function0.scala:39)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
    at scala.App.$anonfun$main$1$adapted(App.scala:80)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at scala.App.main(App.scala:80)
    at scala.App.main$(App.scala:78)
    at com.shopify.trickle.pipelines.IteratorPipeline$.main(IteratorPipeline.scala:11)
    at com.shopify.trickle.pipelines.IteratorPipeline.main(IteratorPipeline.scala) {code}
It would be nice to expose the `op` column to be usable in the Flink SQL APIs as it is in the DataStream APIs.

[1] [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromchangelogstream] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)