You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Gyula Fóra <gy...@gmail.com> on 2020/03/05 13:17:16 UTC

Writing retract streams to Kafka

Hi All!

Excuse my stupid question, I am pretty new to the Table/SQL API and I am
trying to play around with it implementing and running a few use-cases.

I have a simple window join + aggregation, grouped on some id that I want
to write to Kafka but I am hitting the following error:

"AppendStreamTableSink requires that Table has only insert changes."

If I understand correctly the problem here is that since updates are
possible within a single group, we have a retract stream and the Kafka Sink
cannot handle that. I tried to search for the solution but I haven't found
any satisfying answers.

How can I simply tell the INSERT logic to ignore previous values and just
always keep sending the latest (like you would see it on the CLI output).

Thank you!
Gyula

Re: Writing retract streams to Kafka

Posted by Gyula Fóra <gy...@gmail.com>.
Thanks Kurt, I came to the same conclusions after trying what Jark
provided. I can get similar behaviour if I reduce the grouping window to 1
sec but still keep the join window large.

Gyula

On Fri, Mar 6, 2020 at 3:09 PM Kurt Young <yk...@gmail.com> wrote:

> @Gyula Fóra <gy...@gmail.com> I think your query is right, we should
> produce insert only results if you have event time and watermark defined.
> I've create https://issues.apache.org/jira/browse/FLINK-16466 to track
> this issue.
>
> Best,
> Kurt
>
>
> On Fri, Mar 6, 2020 at 12:14 PM Kurt Young <yk...@gmail.com> wrote:
>
>> Actually this use case lead me to start thinking about one question:
>> If watermark is enabled, could we also support GROUP BY event_time
>> instead of forcing
>> user defining a window based on the event_time.
>>
>> GROUP BY a standalone event_time can also be treated as a special window,
>> which has
>> both start_time and end_time equals to event_time. And when watermark
>> surpass the event_time,
>> we can still get the complete data of such group and do required
>> aggregation and then emit
>> insert only results.
>>
>> That would ease user's burden for not having to define a window when they
>> already have event
>> time and watermark defined.
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Mar 6, 2020 at 10:26 AM Jark Wu <im...@gmail.com> wrote:
>>
>>> Hi Gyula,
>>>
>>> Does tumbling 5 seconds for aggregation meet your need? For example:
>>>
>>> INSERT INTO QueryResult
>>> SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5'
>>> SECOND), sum(t.quantity) AS quantity
>>> FROM
>>>   ItemTransactions AS t,
>>>   Queries AS q
>>> WHERE
>>>   t.itemId = q.itemId AND
>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>> q.event_time
>>> GROUP BY
>>>   t.itemId, q.queryId, TUMBLE(q.event_time, INTERVAL '5' SECOND);
>>>
>>> Best,
>>> Jark
>>>
>>> On Thu, 5 Mar 2020 at 23:05, Gyula Fóra <gy...@gmail.com> wrote:
>>>
>>>> I see, maybe I just dont understand how to properly express what I am
>>>> trying to compute.
>>>>
>>>> Basically I want to aggregate the quantities of the transactions that
>>>> happened in the 5 seconds before the query.
>>>> Every query.id belongs to a single query (event_time, itemid) but
>>>> still I have to group :/
>>>>
>>>> Gyula
>>>>
>>>> On Thu, Mar 5, 2020 at 3:45 PM Kurt Young <yk...@gmail.com> wrote:
>>>>
>>>>> I think the issue is not caused by event time interval join, but the
>>>>> aggregation after the join:
>>>>>     GROUP BY t.itemId, q.event_time, q.queryId;
>>>>>
>>>>> In this case, there is still no chance for Flink to determine whether
>>>>> the groups like (itemId, eventtime, queryId) have complete data or not.
>>>>> As a comparison, if you change the grouping key to a window which
>>>>> based only on q.event_time, then the query would emit insert only results.
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra <gy...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> That's exactly the kind of behaviour I am looking for Kurt ("ignore
>>>>>> all delete messages").
>>>>>>
>>>>>> As for the data completion, in my above example it is basically an
>>>>>> event time interval join.
>>>>>> With watermarks defined Flink should be able to compute results once
>>>>>> in exactly the same way as for the tumbling window.
>>>>>>
>>>>>> Gyula
>>>>>>
>>>>>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <yk...@gmail.com> wrote:
>>>>>>
>>>>>>> Back to this case, I assume you are expecting something like "ignore
>>>>>>> all delete messages" flag? With this
>>>>>>> flag turned on, Flink will only send insert messages which
>>>>>>> corresponding current correct results to kafka and
>>>>>>> drop all retractions and deletes on the fly.
>>>>>>>
>>>>>>> Best,
>>>>>>> Kurt
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <yk...@gmail.com> wrote:
>>>>>>>
>>>>>>>> > I also don't completely understand at this point why I can write
>>>>>>>> the result of a group, tumble window aggregate to Kafka and not this window
>>>>>>>> join / aggregate.
>>>>>>>>
>>>>>>>> If you are doing a tumble window aggregate with watermark enabled,
>>>>>>>> Flink will only fire a final result for
>>>>>>>> each window at once, no modification or retractions will happen
>>>>>>>> after a window is calculated and fired.
>>>>>>>> But with some other arbitrary aggregations, there is not enough
>>>>>>>> information for Flink to determine whether
>>>>>>>> the data is complete or not, so the framework will keep calculating
>>>>>>>> results when receiving new records and
>>>>>>>> retract earlier results by firing retraction/deletion messages.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Kurt
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gy...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Benoît!
>>>>>>>>>
>>>>>>>>> I can see now how I can implement this myself through the provided
>>>>>>>>> sink interfaces but I was trying to avoid having to write code for this :D
>>>>>>>>> My initial motivation was to see whether we are able to write out
>>>>>>>>> any kind of table to Kafka as a simple stream of "upserts".
>>>>>>>>>
>>>>>>>>> I also don't completely understand at this point why I can write
>>>>>>>>> the result of a group, tumble window aggregate to Kafka and not this window
>>>>>>>>> join / aggregate.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Gyula
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>>>>>>>>> benoit.paris@centraliens-lille.org> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Gyula,
>>>>>>>>>>
>>>>>>>>>> I'm afraid conversion to see the retractions vs inserts can't be
>>>>>>>>>> done in pure SQL (though I'd love that feature).
>>>>>>>>>>
>>>>>>>>>> You might want to go lower level and implement a
>>>>>>>>>> RetractStreamTableSink [1][2] that you would wrap around a KafkaTableSink
>>>>>>>>>> [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, T>>
>>>>>>>>>> dataStream);, in which the Boolean flag will give you an 'accumulate' or
>>>>>>>>>> 'retract' signal.
>>>>>>>>>> You can then filter the DataStream accordingly before passing to
>>>>>>>>>> the KafkaTableSink.
>>>>>>>>>>
>>>>>>>>>> Hope this helps.
>>>>>>>>>>
>>>>>>>>>> Best regards
>>>>>>>>>> Benoît
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>>>>>>>>>> [2]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>>>>>>>>>> [3]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gy...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Roman,
>>>>>>>>>>>
>>>>>>>>>>> This is the core logic:
>>>>>>>>>>>
>>>>>>>>>>> CREATE TABLE QueryResult (
>>>>>>>>>>> queryId    BIGINT,
>>>>>>>>>>>   itemId    STRING,
>>>>>>>>>>>   quantity INT
>>>>>>>>>>> ) WITH (
>>>>>>>>>>> 'connector.type'     = 'kafka',
>>>>>>>>>>> 'connector.version' = 'universal',
>>>>>>>>>>> 'connector.topic'   = 'query.output.log.1',
>>>>>>>>>>> 'connector.properties.bootstrap.servers' = '<broker>',
>>>>>>>>>>> 'format.type' = 'json'
>>>>>>>>>>> );
>>>>>>>>>>>
>>>>>>>>>>> INSERT INTO QueryResult
>>>>>>>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>>>>>>>>>> FROM
>>>>>>>>>>>   ItemTransactions AS t,
>>>>>>>>>>>   Queries AS q
>>>>>>>>>>> WHERE
>>>>>>>>>>>   t.itemId = q.itemId AND
>>>>>>>>>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>>>>>>>>>> q.event_time
>>>>>>>>>>> GROUP BY
>>>>>>>>>>>   t.itemId, q.event_time, q.queryId;
>>>>>>>>>>>
>>>>>>>>>>> And the error I get is:
>>>>>>>>>>> org.apache.flink.table.client.gateway.SqlExecutionException:
>>>>>>>>>>> Invalid SQL update statement.
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>>>>>>>>>>> at java.util.Optional.ifPresent(Optional.java:159)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>>>>>>>>>>> Caused by: org.apache.flink.table.api.TableException:
>>>>>>>>>>> AppendStreamTableSink requires that Table has only insert changes.
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>>>
>>>>>>>>>>> I am wondering what could I do to just simply pump the result
>>>>>>>>>>> updates to Kafka here.
>>>>>>>>>>>
>>>>>>>>>>> Gyula
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>>>>>>>>>>> khachatryan.roman@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Gyula,
>>>>>>>>>>>>
>>>>>>>>>>>> Could you provide the code of your Flink program, the error
>>>>>>>>>>>> with stacktrace and the Flink version?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks.,
>>>>>>>>>>>> Roman
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gy...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi All!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL
>>>>>>>>>>>>> API and I am trying to play around with it implementing and running a few
>>>>>>>>>>>>> use-cases.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have a simple window join + aggregation, grouped on some id
>>>>>>>>>>>>> that I want to write to Kafka but I am hitting the following error:
>>>>>>>>>>>>>
>>>>>>>>>>>>> "AppendStreamTableSink requires that Table has only insert
>>>>>>>>>>>>> changes."
>>>>>>>>>>>>>
>>>>>>>>>>>>> If I understand correctly the problem here is that since
>>>>>>>>>>>>> updates are possible within a single group, we have a retract stream and
>>>>>>>>>>>>> the Kafka Sink cannot handle that. I tried to search for the solution but I
>>>>>>>>>>>>> haven't found any satisfying answers.
>>>>>>>>>>>>>
>>>>>>>>>>>>> How can I simply tell the INSERT logic to ignore previous
>>>>>>>>>>>>> values and just always keep sending the latest (like you would see it on
>>>>>>>>>>>>> the CLI output).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thank you!
>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Benoît Paris
>>>>>>>>>> Ingénieur Machine Learning Explicable
>>>>>>>>>> Tél : +33 6 60 74 23 00
>>>>>>>>>> http://benoit.paris
>>>>>>>>>> http://explicable.ml
>>>>>>>>>>
>>>>>>>>>

Re: Writing retract streams to Kafka

Posted by Kurt Young <yk...@gmail.com>.
@Gyula Fóra <gy...@gmail.com> I think your query is right, we should
produce insert only results if you have event time and watermark defined.
I've create https://issues.apache.org/jira/browse/FLINK-16466 to track this
issue.

Best,
Kurt


On Fri, Mar 6, 2020 at 12:14 PM Kurt Young <yk...@gmail.com> wrote:

> Actually this use case lead me to start thinking about one question:
> If watermark is enabled, could we also support GROUP BY event_time instead
> of forcing
> user defining a window based on the event_time.
>
> GROUP BY a standalone event_time can also be treated as a special window,
> which has
> both start_time and end_time equals to event_time. And when watermark
> surpass the event_time,
> we can still get the complete data of such group and do required
> aggregation and then emit
> insert only results.
>
> That would ease user's burden for not having to define a window when they
> already have event
> time and watermark defined.
>
> Best,
> Kurt
>
>
> On Fri, Mar 6, 2020 at 10:26 AM Jark Wu <im...@gmail.com> wrote:
>
>> Hi Gyula,
>>
>> Does tumbling 5 seconds for aggregation meet your need? For example:
>>
>> INSERT INTO QueryResult
>> SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5'
>> SECOND), sum(t.quantity) AS quantity
>> FROM
>>   ItemTransactions AS t,
>>   Queries AS q
>> WHERE
>>   t.itemId = q.itemId AND
>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
>> GROUP BY
>>   t.itemId, q.queryId, TUMBLE(q.event_time, INTERVAL '5' SECOND);
>>
>> Best,
>> Jark
>>
>> On Thu, 5 Mar 2020 at 23:05, Gyula Fóra <gy...@gmail.com> wrote:
>>
>>> I see, maybe I just dont understand how to properly express what I am
>>> trying to compute.
>>>
>>> Basically I want to aggregate the quantities of the transactions that
>>> happened in the 5 seconds before the query.
>>> Every query.id belongs to a single query (event_time, itemid) but still
>>> I have to group :/
>>>
>>> Gyula
>>>
>>> On Thu, Mar 5, 2020 at 3:45 PM Kurt Young <yk...@gmail.com> wrote:
>>>
>>>> I think the issue is not caused by event time interval join, but the
>>>> aggregation after the join:
>>>>     GROUP BY t.itemId, q.event_time, q.queryId;
>>>>
>>>> In this case, there is still no chance for Flink to determine whether
>>>> the groups like (itemId, eventtime, queryId) have complete data or not.
>>>> As a comparison, if you change the grouping key to a window which based
>>>> only on q.event_time, then the query would emit insert only results.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra <gy...@gmail.com>
>>>> wrote:
>>>>
>>>>> That's exactly the kind of behaviour I am looking for Kurt ("ignore
>>>>> all delete messages").
>>>>>
>>>>> As for the data completion, in my above example it is basically an
>>>>> event time interval join.
>>>>> With watermarks defined Flink should be able to compute results once
>>>>> in exactly the same way as for the tumbling window.
>>>>>
>>>>> Gyula
>>>>>
>>>>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <yk...@gmail.com> wrote:
>>>>>
>>>>>> Back to this case, I assume you are expecting something like "ignore
>>>>>> all delete messages" flag? With this
>>>>>> flag turned on, Flink will only send insert messages which
>>>>>> corresponding current correct results to kafka and
>>>>>> drop all retractions and deletes on the fly.
>>>>>>
>>>>>> Best,
>>>>>> Kurt
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <yk...@gmail.com> wrote:
>>>>>>
>>>>>>> > I also don't completely understand at this point why I can write
>>>>>>> the result of a group, tumble window aggregate to Kafka and not this window
>>>>>>> join / aggregate.
>>>>>>>
>>>>>>> If you are doing a tumble window aggregate with watermark enabled,
>>>>>>> Flink will only fire a final result for
>>>>>>> each window at once, no modification or retractions will happen
>>>>>>> after a window is calculated and fired.
>>>>>>> But with some other arbitrary aggregations, there is not enough
>>>>>>> information for Flink to determine whether
>>>>>>> the data is complete or not, so the framework will keep calculating
>>>>>>> results when receiving new records and
>>>>>>> retract earlier results by firing retraction/deletion messages.
>>>>>>>
>>>>>>> Best,
>>>>>>> Kurt
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gy...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Benoît!
>>>>>>>>
>>>>>>>> I can see now how I can implement this myself through the provided
>>>>>>>> sink interfaces but I was trying to avoid having to write code for this :D
>>>>>>>> My initial motivation was to see whether we are able to write out
>>>>>>>> any kind of table to Kafka as a simple stream of "upserts".
>>>>>>>>
>>>>>>>> I also don't completely understand at this point why I can write
>>>>>>>> the result of a group, tumble window aggregate to Kafka and not this window
>>>>>>>> join / aggregate.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Gyula
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>>>>>>>> benoit.paris@centraliens-lille.org> wrote:
>>>>>>>>
>>>>>>>>> Hi Gyula,
>>>>>>>>>
>>>>>>>>> I'm afraid conversion to see the retractions vs inserts can't be
>>>>>>>>> done in pure SQL (though I'd love that feature).
>>>>>>>>>
>>>>>>>>> You might want to go lower level and implement a
>>>>>>>>> RetractStreamTableSink [1][2] that you would wrap around a KafkaTableSink
>>>>>>>>> [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, T>>
>>>>>>>>> dataStream);, in which the Boolean flag will give you an 'accumulate' or
>>>>>>>>> 'retract' signal.
>>>>>>>>> You can then filter the DataStream accordingly before passing to
>>>>>>>>> the KafkaTableSink.
>>>>>>>>>
>>>>>>>>> Hope this helps.
>>>>>>>>>
>>>>>>>>> Best regards
>>>>>>>>> Benoît
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>>>>>>>>> [2]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>>>>>>>>> [3]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gy...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Roman,
>>>>>>>>>>
>>>>>>>>>> This is the core logic:
>>>>>>>>>>
>>>>>>>>>> CREATE TABLE QueryResult (
>>>>>>>>>> queryId    BIGINT,
>>>>>>>>>>   itemId    STRING,
>>>>>>>>>>   quantity INT
>>>>>>>>>> ) WITH (
>>>>>>>>>> 'connector.type'     = 'kafka',
>>>>>>>>>> 'connector.version' = 'universal',
>>>>>>>>>> 'connector.topic'   = 'query.output.log.1',
>>>>>>>>>> 'connector.properties.bootstrap.servers' = '<broker>',
>>>>>>>>>> 'format.type' = 'json'
>>>>>>>>>> );
>>>>>>>>>>
>>>>>>>>>> INSERT INTO QueryResult
>>>>>>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>>>>>>>>> FROM
>>>>>>>>>>   ItemTransactions AS t,
>>>>>>>>>>   Queries AS q
>>>>>>>>>> WHERE
>>>>>>>>>>   t.itemId = q.itemId AND
>>>>>>>>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>>>>>>>>> q.event_time
>>>>>>>>>> GROUP BY
>>>>>>>>>>   t.itemId, q.event_time, q.queryId;
>>>>>>>>>>
>>>>>>>>>> And the error I get is:
>>>>>>>>>> org.apache.flink.table.client.gateway.SqlExecutionException:
>>>>>>>>>> Invalid SQL update statement.
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>>>>>>>>>> at java.util.Optional.ifPresent(Optional.java:159)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>>>>>>>>>> Caused by: org.apache.flink.table.api.TableException:
>>>>>>>>>> AppendStreamTableSink requires that Table has only insert changes.
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>>
>>>>>>>>>> I am wondering what could I do to just simply pump the result
>>>>>>>>>> updates to Kafka here.
>>>>>>>>>>
>>>>>>>>>> Gyula
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>>>>>>>>>> khachatryan.roman@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Gyula,
>>>>>>>>>>>
>>>>>>>>>>> Could you provide the code of your Flink program, the error with
>>>>>>>>>>> stacktrace and the Flink version?
>>>>>>>>>>>
>>>>>>>>>>> Thanks.,
>>>>>>>>>>> Roman
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gy...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi All!
>>>>>>>>>>>>
>>>>>>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL API
>>>>>>>>>>>> and I am trying to play around with it implementing and running a few
>>>>>>>>>>>> use-cases.
>>>>>>>>>>>>
>>>>>>>>>>>> I have a simple window join + aggregation, grouped on some id
>>>>>>>>>>>> that I want to write to Kafka but I am hitting the following error:
>>>>>>>>>>>>
>>>>>>>>>>>> "AppendStreamTableSink requires that Table has only insert
>>>>>>>>>>>> changes."
>>>>>>>>>>>>
>>>>>>>>>>>> If I understand correctly the problem here is that since
>>>>>>>>>>>> updates are possible within a single group, we have a retract stream and
>>>>>>>>>>>> the Kafka Sink cannot handle that. I tried to search for the solution but I
>>>>>>>>>>>> haven't found any satisfying answers.
>>>>>>>>>>>>
>>>>>>>>>>>> How can I simply tell the INSERT logic to ignore previous
>>>>>>>>>>>> values and just always keep sending the latest (like you would see it on
>>>>>>>>>>>> the CLI output).
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you!
>>>>>>>>>>>> Gyula
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Benoît Paris
>>>>>>>>> Ingénieur Machine Learning Explicable
>>>>>>>>> Tél : +33 6 60 74 23 00
>>>>>>>>> http://benoit.paris
>>>>>>>>> http://explicable.ml
>>>>>>>>>
>>>>>>>>

Re: Writing retract streams to Kafka

Posted by Kurt Young <yk...@gmail.com>.
Actually this use case lead me to start thinking about one question:
If watermark is enabled, could we also support GROUP BY event_time instead
of forcing
user defining a window based on the event_time.

GROUP BY a standalone event_time can also be treated as a special window,
which has
both start_time and end_time equals to event_time. And when watermark
surpass the event_time,
we can still get the complete data of such group and do required
aggregation and then emit
insert only results.

That would ease user's burden for not having to define a window when they
already have event
time and watermark defined.

Best,
Kurt


On Fri, Mar 6, 2020 at 10:26 AM Jark Wu <im...@gmail.com> wrote:

> Hi Gyula,
>
> Does tumbling 5 seconds for aggregation meet your need? For example:
>
> INSERT INTO QueryResult
> SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5'
> SECOND), sum(t.quantity) AS quantity
> FROM
>   ItemTransactions AS t,
>   Queries AS q
> WHERE
>   t.itemId = q.itemId AND
>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
> GROUP BY
>   t.itemId, q.queryId, TUMBLE(q.event_time, INTERVAL '5' SECOND);
>
> Best,
> Jark
>
> On Thu, 5 Mar 2020 at 23:05, Gyula Fóra <gy...@gmail.com> wrote:
>
>> I see, maybe I just dont understand how to properly express what I am
>> trying to compute.
>>
>> Basically I want to aggregate the quantities of the transactions that
>> happened in the 5 seconds before the query.
>> Every query.id belongs to a single query (event_time, itemid) but still
>> I have to group :/
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 3:45 PM Kurt Young <yk...@gmail.com> wrote:
>>
>>> I think the issue is not caused by event time interval join, but the
>>> aggregation after the join:
>>>     GROUP BY t.itemId, q.event_time, q.queryId;
>>>
>>> In this case, there is still no chance for Flink to determine whether
>>> the groups like (itemId, eventtime, queryId) have complete data or not.
>>> As a comparison, if you change the grouping key to a window which based
>>> only on q.event_time, then the query would emit insert only results.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra <gy...@gmail.com> wrote:
>>>
>>>> That's exactly the kind of behaviour I am looking for Kurt ("ignore all
>>>> delete messages").
>>>>
>>>> As for the data completion, in my above example it is basically an
>>>> event time interval join.
>>>> With watermarks defined Flink should be able to compute results once in
>>>> exactly the same way as for the tumbling window.
>>>>
>>>> Gyula
>>>>
>>>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <yk...@gmail.com> wrote:
>>>>
>>>>> Back to this case, I assume you are expecting something like "ignore
>>>>> all delete messages" flag? With this
>>>>> flag turned on, Flink will only send insert messages which
>>>>> corresponding current correct results to kafka and
>>>>> drop all retractions and deletes on the fly.
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <yk...@gmail.com> wrote:
>>>>>
>>>>>> > I also don't completely understand at this point why I can write
>>>>>> the result of a group, tumble window aggregate to Kafka and not this window
>>>>>> join / aggregate.
>>>>>>
>>>>>> If you are doing a tumble window aggregate with watermark enabled,
>>>>>> Flink will only fire a final result for
>>>>>> each window at once, no modification or retractions will happen after
>>>>>> a window is calculated and fired.
>>>>>> But with some other arbitrary aggregations, there is not enough
>>>>>> information for Flink to determine whether
>>>>>> the data is complete or not, so the framework will keep calculating
>>>>>> results when receiving new records and
>>>>>> retract earlier results by firing retraction/deletion messages.
>>>>>>
>>>>>> Best,
>>>>>> Kurt
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gy...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Benoît!
>>>>>>>
>>>>>>> I can see now how I can implement this myself through the provided
>>>>>>> sink interfaces but I was trying to avoid having to write code for this :D
>>>>>>> My initial motivation was to see whether we are able to write out
>>>>>>> any kind of table to Kafka as a simple stream of "upserts".
>>>>>>>
>>>>>>> I also don't completely understand at this point why I can write the
>>>>>>> result of a group, tumble window aggregate to Kafka and not this window
>>>>>>> join / aggregate.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Gyula
>>>>>>>
>>>>>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>>>>>>> benoit.paris@centraliens-lille.org> wrote:
>>>>>>>
>>>>>>>> Hi Gyula,
>>>>>>>>
>>>>>>>> I'm afraid conversion to see the retractions vs inserts can't be
>>>>>>>> done in pure SQL (though I'd love that feature).
>>>>>>>>
>>>>>>>> You might want to go lower level and implement a
>>>>>>>> RetractStreamTableSink [1][2] that you would wrap around a KafkaTableSink
>>>>>>>> [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, T>>
>>>>>>>> dataStream);, in which the Boolean flag will give you an 'accumulate' or
>>>>>>>> 'retract' signal.
>>>>>>>> You can then filter the DataStream accordingly before passing to
>>>>>>>> the KafkaTableSink.
>>>>>>>>
>>>>>>>> Hope this helps.
>>>>>>>>
>>>>>>>> Best regards
>>>>>>>> Benoît
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>>>>>>>> [2]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>>>>>>>> [3]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gy...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Roman,
>>>>>>>>>
>>>>>>>>> This is the core logic:
>>>>>>>>>
>>>>>>>>> CREATE TABLE QueryResult (
>>>>>>>>> queryId    BIGINT,
>>>>>>>>>   itemId    STRING,
>>>>>>>>>   quantity INT
>>>>>>>>> ) WITH (
>>>>>>>>> 'connector.type'     = 'kafka',
>>>>>>>>> 'connector.version' = 'universal',
>>>>>>>>> 'connector.topic'   = 'query.output.log.1',
>>>>>>>>> 'connector.properties.bootstrap.servers' = '<broker>',
>>>>>>>>> 'format.type' = 'json'
>>>>>>>>> );
>>>>>>>>>
>>>>>>>>> INSERT INTO QueryResult
>>>>>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>>>>>>>> FROM
>>>>>>>>>   ItemTransactions AS t,
>>>>>>>>>   Queries AS q
>>>>>>>>> WHERE
>>>>>>>>>   t.itemId = q.itemId AND
>>>>>>>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>>>>>>>> q.event_time
>>>>>>>>> GROUP BY
>>>>>>>>>   t.itemId, q.event_time, q.queryId;
>>>>>>>>>
>>>>>>>>> And the error I get is:
>>>>>>>>> org.apache.flink.table.client.gateway.SqlExecutionException:
>>>>>>>>> Invalid SQL update statement.
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>>>>>>>>> at java.util.Optional.ifPresent(Optional.java:159)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>>>>>>>>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>>>>>>>>> Caused by: org.apache.flink.table.api.TableException:
>>>>>>>>> AppendStreamTableSink requires that Table has only insert changes.
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>
>>>>>>>>> I am wondering what could I do to just simply pump the result
>>>>>>>>> updates to Kafka here.
>>>>>>>>>
>>>>>>>>> Gyula
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>>>>>>>>> khachatryan.roman@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Gyula,
>>>>>>>>>>
>>>>>>>>>> Could you provide the code of your Flink program, the error with
>>>>>>>>>> stacktrace and the Flink version?
>>>>>>>>>>
>>>>>>>>>> Thanks.,
>>>>>>>>>> Roman
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gy...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi All!
>>>>>>>>>>>
>>>>>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL API
>>>>>>>>>>> and I am trying to play around with it implementing and running a few
>>>>>>>>>>> use-cases.
>>>>>>>>>>>
>>>>>>>>>>> I have a simple window join + aggregation, grouped on some id
>>>>>>>>>>> that I want to write to Kafka but I am hitting the following error:
>>>>>>>>>>>
>>>>>>>>>>> "AppendStreamTableSink requires that Table has only insert
>>>>>>>>>>> changes."
>>>>>>>>>>>
>>>>>>>>>>> If I understand correctly the problem here is that since updates
>>>>>>>>>>> are possible within a single group, we have a retract stream and the Kafka
>>>>>>>>>>> Sink cannot handle that. I tried to search for the solution but I haven't
>>>>>>>>>>> found any satisfying answers.
>>>>>>>>>>>
>>>>>>>>>>> How can I simply tell the INSERT logic to ignore previous values
>>>>>>>>>>> and just always keep sending the latest (like you would see it on the CLI
>>>>>>>>>>> output).
>>>>>>>>>>>
>>>>>>>>>>> Thank you!
>>>>>>>>>>> Gyula
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Benoît Paris
>>>>>>>> Ingénieur Machine Learning Explicable
>>>>>>>> Tél : +33 6 60 74 23 00
>>>>>>>> http://benoit.paris
>>>>>>>> http://explicable.ml
>>>>>>>>
>>>>>>>

Re: Writing retract streams to Kafka

Posted by Jark Wu <im...@gmail.com>.
Hi Gyula,

Does tumbling 5 seconds for aggregation meet your need? For example:

INSERT INTO QueryResult
SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5'
SECOND), sum(t.quantity) AS quantity
FROM
  ItemTransactions AS t,
  Queries AS q
WHERE
  t.itemId = q.itemId AND
  t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
GROUP BY
  t.itemId, q.queryId, TUMBLE(q.event_time, INTERVAL '5' SECOND);

Best,
Jark

On Thu, 5 Mar 2020 at 23:05, Gyula Fóra <gy...@gmail.com> wrote:

> I see, maybe I just dont understand how to properly express what I am
> trying to compute.
>
> Basically I want to aggregate the quantities of the transactions that
> happened in the 5 seconds before the query.
> Every query.id belongs to a single query (event_time, itemid) but still I
> have to group :/
>
> Gyula
>
> On Thu, Mar 5, 2020 at 3:45 PM Kurt Young <yk...@gmail.com> wrote:
>
>> I think the issue is not caused by event time interval join, but the
>> aggregation after the join:
>>     GROUP BY t.itemId, q.event_time, q.queryId;
>>
>> In this case, there is still no chance for Flink to determine whether the
>> groups like (itemId, eventtime, queryId) have complete data or not.
>> As a comparison, if you change the grouping key to a window which based
>> only on q.event_time, then the query would emit insert only results.
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra <gy...@gmail.com> wrote:
>>
>>> That's exactly the kind of behaviour I am looking for Kurt ("ignore all
>>> delete messages").
>>>
>>> As for the data completion, in my above example it is basically an event
>>> time interval join.
>>> With watermarks defined Flink should be able to compute results once in
>>> exactly the same way as for the tumbling window.
>>>
>>> Gyula
>>>
>>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <yk...@gmail.com> wrote:
>>>
>>>> Back to this case, I assume you are expecting something like "ignore
>>>> all delete messages" flag? With this
>>>> flag turned on, Flink will only send insert messages which
>>>> corresponding current correct results to kafka and
>>>> drop all retractions and deletes on the fly.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <yk...@gmail.com> wrote:
>>>>
>>>>> > I also don't completely understand at this point why I can write the
>>>>> result of a group, tumble window aggregate to Kafka and not this window
>>>>> join / aggregate.
>>>>>
>>>>> If you are doing a tumble window aggregate with watermark enabled,
>>>>> Flink will only fire a final result for
>>>>> each window at once, no modification or retractions will happen after
>>>>> a window is calculated and fired.
>>>>> But with some other arbitrary aggregations, there is not enough
>>>>> information for Flink to determine whether
>>>>> the data is complete or not, so the framework will keep calculating
>>>>> results when receiving new records and
>>>>> retract earlier results by firing retraction/deletion messages.
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gy...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks Benoît!
>>>>>>
>>>>>> I can see now how I can implement this myself through the provided
>>>>>> sink interfaces but I was trying to avoid having to write code for this :D
>>>>>> My initial motivation was to see whether we are able to write out any
>>>>>> kind of table to Kafka as a simple stream of "upserts".
>>>>>>
>>>>>> I also don't completely understand at this point why I can write the
>>>>>> result of a group, tumble window aggregate to Kafka and not this window
>>>>>> join / aggregate.
>>>>>>
>>>>>> Cheers,
>>>>>> Gyula
>>>>>>
>>>>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>>>>>> benoit.paris@centraliens-lille.org> wrote:
>>>>>>
>>>>>>> Hi Gyula,
>>>>>>>
>>>>>>> I'm afraid conversion to see the retractions vs inserts can't be
>>>>>>> done in pure SQL (though I'd love that feature).
>>>>>>>
>>>>>>> You might want to go lower level and implement a
>>>>>>> RetractStreamTableSink [1][2] that you would wrap around a KafkaTableSink
>>>>>>> [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, T>>
>>>>>>> dataStream);, in which the Boolean flag will give you an 'accumulate' or
>>>>>>> 'retract' signal.
>>>>>>> You can then filter the DataStream accordingly before passing to the
>>>>>>> KafkaTableSink.
>>>>>>>
>>>>>>> Hope this helps.
>>>>>>>
>>>>>>> Best regards
>>>>>>> Benoît
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>>>>>>> [2]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>>>>>>> [3]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>>>>>>
>>>>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gy...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Roman,
>>>>>>>>
>>>>>>>> This is the core logic:
>>>>>>>>
>>>>>>>> CREATE TABLE QueryResult (
>>>>>>>> queryId    BIGINT,
>>>>>>>>   itemId    STRING,
>>>>>>>>   quantity INT
>>>>>>>> ) WITH (
>>>>>>>> 'connector.type'     = 'kafka',
>>>>>>>> 'connector.version' = 'universal',
>>>>>>>> 'connector.topic'   = 'query.output.log.1',
>>>>>>>> 'connector.properties.bootstrap.servers' = '<broker>',
>>>>>>>> 'format.type' = 'json'
>>>>>>>> );
>>>>>>>>
>>>>>>>> INSERT INTO QueryResult
>>>>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>>>>>>> FROM
>>>>>>>>   ItemTransactions AS t,
>>>>>>>>   Queries AS q
>>>>>>>> WHERE
>>>>>>>>   t.itemId = q.itemId AND
>>>>>>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>>>>>>> q.event_time
>>>>>>>> GROUP BY
>>>>>>>>   t.itemId, q.event_time, q.queryId;
>>>>>>>>
>>>>>>>> And the error I get is:
>>>>>>>> org.apache.flink.table.client.gateway.SqlExecutionException:
>>>>>>>> Invalid SQL update statement.
>>>>>>>> at
>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>>>>>>>> at java.util.Optional.ifPresent(Optional.java:159)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>>>>>>>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>>>>>>>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>>>>>>>> Caused by: org.apache.flink.table.api.TableException:
>>>>>>>> AppendStreamTableSink requires that Table has only insert changes.
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>
>>>>>>>> I am wondering what could I do to just simply pump the result
>>>>>>>> updates to Kafka here.
>>>>>>>>
>>>>>>>> Gyula
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>>>>>>>> khachatryan.roman@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Gyula,
>>>>>>>>>
>>>>>>>>> Could you provide the code of your Flink program, the error with
>>>>>>>>> stacktrace and the Flink version?
>>>>>>>>>
>>>>>>>>> Thanks.,
>>>>>>>>> Roman
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gy...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All!
>>>>>>>>>>
>>>>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL API
>>>>>>>>>> and I am trying to play around with it implementing and running a few
>>>>>>>>>> use-cases.
>>>>>>>>>>
>>>>>>>>>> I have a simple window join + aggregation, grouped on some id
>>>>>>>>>> that I want to write to Kafka but I am hitting the following error:
>>>>>>>>>>
>>>>>>>>>> "AppendStreamTableSink requires that Table has only insert
>>>>>>>>>> changes."
>>>>>>>>>>
>>>>>>>>>> If I understand correctly the problem here is that since updates
>>>>>>>>>> are possible within a single group, we have a retract stream and the Kafka
>>>>>>>>>> Sink cannot handle that. I tried to search for the solution but I haven't
>>>>>>>>>> found any satisfying answers.
>>>>>>>>>>
>>>>>>>>>> How can I simply tell the INSERT logic to ignore previous values
>>>>>>>>>> and just always keep sending the latest (like you would see it on the CLI
>>>>>>>>>> output).
>>>>>>>>>>
>>>>>>>>>> Thank you!
>>>>>>>>>> Gyula
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Benoît Paris
>>>>>>> Ingénieur Machine Learning Explicable
>>>>>>> Tél : +33 6 60 74 23 00
>>>>>>> http://benoit.paris
>>>>>>> http://explicable.ml
>>>>>>>
>>>>>>

Re: Writing retract streams to Kafka

Posted by Gyula Fóra <gy...@gmail.com>.
I see, maybe I just dont understand how to properly express what I am
trying to compute.

Basically I want to aggregate the quantities of the transactions that
happened in the 5 seconds before the query.
Every query.id belongs to a single query (event_time, itemid) but still I
have to group :/

Gyula

On Thu, Mar 5, 2020 at 3:45 PM Kurt Young <yk...@gmail.com> wrote:

> I think the issue is not caused by event time interval join, but the
> aggregation after the join:
>     GROUP BY t.itemId, q.event_time, q.queryId;
>
> In this case, there is still no chance for Flink to determine whether the
> groups like (itemId, eventtime, queryId) have complete data or not.
> As a comparison, if you change the grouping key to a window which based
> only on q.event_time, then the query would emit insert only results.
>
> Best,
> Kurt
>
>
> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra <gy...@gmail.com> wrote:
>
>> That's exactly the kind of behaviour I am looking for Kurt ("ignore all
>> delete messages").
>>
>> As for the data completion, in my above example it is basically an event
>> time interval join.
>> With watermarks defined Flink should be able to compute results once in
>> exactly the same way as for the tumbling window.
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <yk...@gmail.com> wrote:
>>
>>> Back to this case, I assume you are expecting something like "ignore all
>>> delete messages" flag? With this
>>> flag turned on, Flink will only send insert messages which corresponding
>>> current correct results to kafka and
>>> drop all retractions and deletes on the fly.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <yk...@gmail.com> wrote:
>>>
>>>> > I also don't completely understand at this point why I can write the
>>>> result of a group, tumble window aggregate to Kafka and not this window
>>>> join / aggregate.
>>>>
>>>> If you are doing a tumble window aggregate with watermark enabled,
>>>> Flink will only fire a final result for
>>>> each window at once, no modification or retractions will happen after a
>>>> window is calculated and fired.
>>>> But with some other arbitrary aggregations, there is not enough
>>>> information for Flink to determine whether
>>>> the data is complete or not, so the framework will keep calculating
>>>> results when receiving new records and
>>>> retract earlier results by firing retraction/deletion messages.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gy...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Benoît!
>>>>>
>>>>> I can see now how I can implement this myself through the provided
>>>>> sink interfaces but I was trying to avoid having to write code for this :D
>>>>> My initial motivation was to see whether we are able to write out any
>>>>> kind of table to Kafka as a simple stream of "upserts".
>>>>>
>>>>> I also don't completely understand at this point why I can write the
>>>>> result of a group, tumble window aggregate to Kafka and not this window
>>>>> join / aggregate.
>>>>>
>>>>> Cheers,
>>>>> Gyula
>>>>>
>>>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>>>>> benoit.paris@centraliens-lille.org> wrote:
>>>>>
>>>>>> Hi Gyula,
>>>>>>
>>>>>> I'm afraid conversion to see the retractions vs inserts can't be done
>>>>>> in pure SQL (though I'd love that feature).
>>>>>>
>>>>>> You might want to go lower level and implement a
>>>>>> RetractStreamTableSink [1][2] that you would wrap around a KafkaTableSink
>>>>>> [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, T>>
>>>>>> dataStream);, in which the Boolean flag will give you an 'accumulate' or
>>>>>> 'retract' signal.
>>>>>> You can then filter the DataStream accordingly before passing to the
>>>>>> KafkaTableSink.
>>>>>>
>>>>>> Hope this helps.
>>>>>>
>>>>>> Best regards
>>>>>> Benoît
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>>>>>> [3]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>>>>>
>>>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gy...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Roman,
>>>>>>>
>>>>>>> This is the core logic:
>>>>>>>
>>>>>>> CREATE TABLE QueryResult (
>>>>>>> queryId    BIGINT,
>>>>>>>   itemId    STRING,
>>>>>>>   quantity INT
>>>>>>> ) WITH (
>>>>>>> 'connector.type'     = 'kafka',
>>>>>>> 'connector.version' = 'universal',
>>>>>>> 'connector.topic'   = 'query.output.log.1',
>>>>>>> 'connector.properties.bootstrap.servers' = '<broker>',
>>>>>>> 'format.type' = 'json'
>>>>>>> );
>>>>>>>
>>>>>>> INSERT INTO QueryResult
>>>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>>>>>> FROM
>>>>>>>   ItemTransactions AS t,
>>>>>>>   Queries AS q
>>>>>>> WHERE
>>>>>>>   t.itemId = q.itemId AND
>>>>>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>>>>>> q.event_time
>>>>>>> GROUP BY
>>>>>>>   t.itemId, q.event_time, q.queryId;
>>>>>>>
>>>>>>> And the error I get is:
>>>>>>> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid
>>>>>>> SQL update statement.
>>>>>>> at
>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>>>>>>> at
>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>>>>>>> at
>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>>>>>>> at
>>>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>>>>>>> at
>>>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>>>>>>> at java.util.Optional.ifPresent(Optional.java:159)
>>>>>>> at
>>>>>>> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>>>>>>> at
>>>>>>> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>>>>>>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>>>>>>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>>>>>>> Caused by: org.apache.flink.table.api.TableException:
>>>>>>> AppendStreamTableSink requires that Table has only insert changes.
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>
>>>>>>> I am wondering what could I do to just simply pump the result
>>>>>>> updates to Kafka here.
>>>>>>>
>>>>>>> Gyula
>>>>>>>
>>>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>>>>>>> khachatryan.roman@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Gyula,
>>>>>>>>
>>>>>>>> Could you provide the code of your Flink program, the error with
>>>>>>>> stacktrace and the Flink version?
>>>>>>>>
>>>>>>>> Thanks.,
>>>>>>>> Roman
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gy...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All!
>>>>>>>>>
>>>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL API
>>>>>>>>> and I am trying to play around with it implementing and running a few
>>>>>>>>> use-cases.
>>>>>>>>>
>>>>>>>>> I have a simple window join + aggregation, grouped on some id that
>>>>>>>>> I want to write to Kafka but I am hitting the following error:
>>>>>>>>>
>>>>>>>>> "AppendStreamTableSink requires that Table has only insert
>>>>>>>>> changes."
>>>>>>>>>
>>>>>>>>> If I understand correctly the problem here is that since updates
>>>>>>>>> are possible within a single group, we have a retract stream and the Kafka
>>>>>>>>> Sink cannot handle that. I tried to search for the solution but I haven't
>>>>>>>>> found any satisfying answers.
>>>>>>>>>
>>>>>>>>> How can I simply tell the INSERT logic to ignore previous values
>>>>>>>>> and just always keep sending the latest (like you would see it on the CLI
>>>>>>>>> output).
>>>>>>>>>
>>>>>>>>> Thank you!
>>>>>>>>> Gyula
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Benoît Paris
>>>>>> Ingénieur Machine Learning Explicable
>>>>>> Tél : +33 6 60 74 23 00
>>>>>> http://benoit.paris
>>>>>> http://explicable.ml
>>>>>>
>>>>>

Re: Writing retract streams to Kafka

Posted by Kurt Young <yk...@gmail.com>.
I think the issue is not caused by event time interval join, but the
aggregation after the join:
    GROUP BY t.itemId, q.event_time, q.queryId;

In this case, there is still no chance for Flink to determine whether the
groups like (itemId, eventtime, queryId) have complete data or not.
As a comparison, if you change the grouping key to a window which based
only on q.event_time, then the query would emit insert only results.

Best,
Kurt


On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra <gy...@gmail.com> wrote:

> That's exactly the kind of behaviour I am looking for Kurt ("ignore all
> delete messages").
>
> As for the data completion, in my above example it is basically an event
> time interval join.
> With watermarks defined Flink should be able to compute results once in
> exactly the same way as for the tumbling window.
>
> Gyula
>
> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <yk...@gmail.com> wrote:
>
>> Back to this case, I assume you are expecting something like "ignore all
>> delete messages" flag? With this
>> flag turned on, Flink will only send insert messages which corresponding
>> current correct results to kafka and
>> drop all retractions and deletes on the fly.
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <yk...@gmail.com> wrote:
>>
>>> > I also don't completely understand at this point why I can write the
>>> result of a group, tumble window aggregate to Kafka and not this window
>>> join / aggregate.
>>>
>>> If you are doing a tumble window aggregate with watermark enabled, Flink
>>> will only fire a final result for
>>> each window at once, no modification or retractions will happen after a
>>> window is calculated and fired.
>>> But with some other arbitrary aggregations, there is not enough
>>> information for Flink to determine whether
>>> the data is complete or not, so the framework will keep calculating
>>> results when receiving new records and
>>> retract earlier results by firing retraction/deletion messages.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gy...@gmail.com> wrote:
>>>
>>>> Thanks Benoît!
>>>>
>>>> I can see now how I can implement this myself through the provided sink
>>>> interfaces but I was trying to avoid having to write code for this :D
>>>> My initial motivation was to see whether we are able to write out any
>>>> kind of table to Kafka as a simple stream of "upserts".
>>>>
>>>> I also don't completely understand at this point why I can write the
>>>> result of a group, tumble window aggregate to Kafka and not this window
>>>> join / aggregate.
>>>>
>>>> Cheers,
>>>> Gyula
>>>>
>>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>>>> benoit.paris@centraliens-lille.org> wrote:
>>>>
>>>>> Hi Gyula,
>>>>>
>>>>> I'm afraid conversion to see the retractions vs inserts can't be done
>>>>> in pure SQL (though I'd love that feature).
>>>>>
>>>>> You might want to go lower level and implement a
>>>>> RetractStreamTableSink [1][2] that you would wrap around a KafkaTableSink
>>>>> [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, T>>
>>>>> dataStream);, in which the Boolean flag will give you an 'accumulate' or
>>>>> 'retract' signal.
>>>>> You can then filter the DataStream accordingly before passing to the
>>>>> KafkaTableSink.
>>>>>
>>>>> Hope this helps.
>>>>>
>>>>> Best regards
>>>>> Benoît
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>>>>> [3]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>>>>
>>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gy...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Roman,
>>>>>>
>>>>>> This is the core logic:
>>>>>>
>>>>>> CREATE TABLE QueryResult (
>>>>>> queryId    BIGINT,
>>>>>>   itemId    STRING,
>>>>>>   quantity INT
>>>>>> ) WITH (
>>>>>> 'connector.type'     = 'kafka',
>>>>>> 'connector.version' = 'universal',
>>>>>> 'connector.topic'   = 'query.output.log.1',
>>>>>> 'connector.properties.bootstrap.servers' = '<broker>',
>>>>>> 'format.type' = 'json'
>>>>>> );
>>>>>>
>>>>>> INSERT INTO QueryResult
>>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>>>>> FROM
>>>>>>   ItemTransactions AS t,
>>>>>>   Queries AS q
>>>>>> WHERE
>>>>>>   t.itemId = q.itemId AND
>>>>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>>>>> q.event_time
>>>>>> GROUP BY
>>>>>>   t.itemId, q.event_time, q.queryId;
>>>>>>
>>>>>> And the error I get is:
>>>>>> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid
>>>>>> SQL update statement.
>>>>>> at
>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>>>>>> at
>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>>>>>> at
>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>>>>>> at
>>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>>>>>> at
>>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>>>>>> at java.util.Optional.ifPresent(Optional.java:159)
>>>>>> at
>>>>>> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>>>>>> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>>>>>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>>>>>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>>>>>> Caused by: org.apache.flink.table.api.TableException:
>>>>>> AppendStreamTableSink requires that Table has only insert changes.
>>>>>> at
>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>>>>>> at
>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>> at
>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>
>>>>>> I am wondering what could I do to just simply pump the result updates
>>>>>> to Kafka here.
>>>>>>
>>>>>> Gyula
>>>>>>
>>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>>>>>> khachatryan.roman@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Gyula,
>>>>>>>
>>>>>>> Could you provide the code of your Flink program, the error with
>>>>>>> stacktrace and the Flink version?
>>>>>>>
>>>>>>> Thanks.,
>>>>>>> Roman
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gy...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All!
>>>>>>>>
>>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL API and
>>>>>>>> I am trying to play around with it implementing and running a few use-cases.
>>>>>>>>
>>>>>>>> I have a simple window join + aggregation, grouped on some id that
>>>>>>>> I want to write to Kafka but I am hitting the following error:
>>>>>>>>
>>>>>>>> "AppendStreamTableSink requires that Table has only insert changes."
>>>>>>>>
>>>>>>>> If I understand correctly the problem here is that since updates
>>>>>>>> are possible within a single group, we have a retract stream and the Kafka
>>>>>>>> Sink cannot handle that. I tried to search for the solution but I haven't
>>>>>>>> found any satisfying answers.
>>>>>>>>
>>>>>>>> How can I simply tell the INSERT logic to ignore previous values
>>>>>>>> and just always keep sending the latest (like you would see it on the CLI
>>>>>>>> output).
>>>>>>>>
>>>>>>>> Thank you!
>>>>>>>> Gyula
>>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>> Benoît Paris
>>>>> Ingénieur Machine Learning Explicable
>>>>> Tél : +33 6 60 74 23 00
>>>>> http://benoit.paris
>>>>> http://explicable.ml
>>>>>
>>>>

Re: Writing retract streams to Kafka

Posted by Gyula Fóra <gy...@gmail.com>.
That's exactly the kind of behaviour I am looking for Kurt ("ignore all
delete messages").

As for the data completion, in my above example it is basically an event
time interval join.
With watermarks defined Flink should be able to compute results once in
exactly the same way as for the tumbling window.

Gyula

On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <yk...@gmail.com> wrote:

> Back to this case, I assume you are expecting something like "ignore all
> delete messages" flag? With this
> flag turned on, Flink will only send insert messages which corresponding
> current correct results to kafka and
> drop all retractions and deletes on the fly.
>
> Best,
> Kurt
>
>
> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <yk...@gmail.com> wrote:
>
>> > I also don't completely understand at this point why I can write the
>> result of a group, tumble window aggregate to Kafka and not this window
>> join / aggregate.
>>
>> If you are doing a tumble window aggregate with watermark enabled, Flink
>> will only fire a final result for
>> each window at once, no modification or retractions will happen after a
>> window is calculated and fired.
>> But with some other arbitrary aggregations, there is not enough
>> information for Flink to determine whether
>> the data is complete or not, so the framework will keep calculating
>> results when receiving new records and
>> retract earlier results by firing retraction/deletion messages.
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gy...@gmail.com> wrote:
>>
>>> Thanks Benoît!
>>>
>>> I can see now how I can implement this myself through the provided sink
>>> interfaces but I was trying to avoid having to write code for this :D
>>> My initial motivation was to see whether we are able to write out any
>>> kind of table to Kafka as a simple stream of "upserts".
>>>
>>> I also don't completely understand at this point why I can write the
>>> result of a group, tumble window aggregate to Kafka and not this window
>>> join / aggregate.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>>> benoit.paris@centraliens-lille.org> wrote:
>>>
>>>> Hi Gyula,
>>>>
>>>> I'm afraid conversion to see the retractions vs inserts can't be done
>>>> in pure SQL (though I'd love that feature).
>>>>
>>>> You might want to go lower level and implement a RetractStreamTableSink
>>>> [1][2] that you would wrap around a KafkaTableSink [3]. This will give you
>>>> a emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);, in which the
>>>> Boolean flag will give you an 'accumulate' or 'retract' signal.
>>>> You can then filter the DataStream accordingly before passing to the
>>>> KafkaTableSink.
>>>>
>>>> Hope this helps.
>>>>
>>>> Best regards
>>>> Benoît
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>>>> [3]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>>>
>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gy...@gmail.com> wrote:
>>>>
>>>>> Hi Roman,
>>>>>
>>>>> This is the core logic:
>>>>>
>>>>> CREATE TABLE QueryResult (
>>>>> queryId    BIGINT,
>>>>>   itemId    STRING,
>>>>>   quantity INT
>>>>> ) WITH (
>>>>> 'connector.type'     = 'kafka',
>>>>> 'connector.version' = 'universal',
>>>>> 'connector.topic'   = 'query.output.log.1',
>>>>> 'connector.properties.bootstrap.servers' = '<broker>',
>>>>> 'format.type' = 'json'
>>>>> );
>>>>>
>>>>> INSERT INTO QueryResult
>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>>>> FROM
>>>>>   ItemTransactions AS t,
>>>>>   Queries AS q
>>>>> WHERE
>>>>>   t.itemId = q.itemId AND
>>>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>>>> q.event_time
>>>>> GROUP BY
>>>>>   t.itemId, q.event_time, q.queryId;
>>>>>
>>>>> And the error I get is:
>>>>> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid
>>>>> SQL update statement.
>>>>> at
>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>>>>> at
>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>>>>> at
>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>>>>> at
>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>>>>> at
>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>>>>> at java.util.Optional.ifPresent(Optional.java:159)
>>>>> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>>>>> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>>>>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>>>>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>>>>> Caused by: org.apache.flink.table.api.TableException:
>>>>> AppendStreamTableSink requires that Table has only insert changes.
>>>>> at
>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>>>>> at
>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>> at
>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>
>>>>> I am wondering what could I do to just simply pump the result updates
>>>>> to Kafka here.
>>>>>
>>>>> Gyula
>>>>>
>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>>>>> khachatryan.roman@gmail.com> wrote:
>>>>>
>>>>>> Hi Gyula,
>>>>>>
>>>>>> Could you provide the code of your Flink program, the error with
>>>>>> stacktrace and the Flink version?
>>>>>>
>>>>>> Thanks.,
>>>>>> Roman
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gy...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All!
>>>>>>>
>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL API and
>>>>>>> I am trying to play around with it implementing and running a few use-cases.
>>>>>>>
>>>>>>> I have a simple window join + aggregation, grouped on some id that I
>>>>>>> want to write to Kafka but I am hitting the following error:
>>>>>>>
>>>>>>> "AppendStreamTableSink requires that Table has only insert changes."
>>>>>>>
>>>>>>> If I understand correctly the problem here is that since updates are
>>>>>>> possible within a single group, we have a retract stream and the Kafka Sink
>>>>>>> cannot handle that. I tried to search for the solution but I haven't found
>>>>>>> any satisfying answers.
>>>>>>>
>>>>>>> How can I simply tell the INSERT logic to ignore previous values and
>>>>>>> just always keep sending the latest (like you would see it on the CLI
>>>>>>> output).
>>>>>>>
>>>>>>> Thank you!
>>>>>>> Gyula
>>>>>>>
>>>>>>
>>>>
>>>> --
>>>> Benoît Paris
>>>> Ingénieur Machine Learning Explicable
>>>> Tél : +33 6 60 74 23 00
>>>> http://benoit.paris
>>>> http://explicable.ml
>>>>
>>>

Re: Writing retract streams to Kafka

Posted by Kurt Young <yk...@gmail.com>.
Back to this case, I assume you are expecting something like "ignore all
delete messages" flag? With this
flag turned on, Flink will only send insert messages which corresponding
current correct results to kafka and
drop all retractions and deletes on the fly.

Best,
Kurt


On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <yk...@gmail.com> wrote:

> > I also don't completely understand at this point why I can write the
> result of a group, tumble window aggregate to Kafka and not this window
> join / aggregate.
>
> If you are doing a tumble window aggregate with watermark enabled, Flink
> will only fire a final result for
> each window at once, no modification or retractions will happen after a
> window is calculated and fired.
> But with some other arbitrary aggregations, there is not enough
> information for Flink to determine whether
> the data is complete or not, so the framework will keep calculating
> results when receiving new records and
> retract earlier results by firing retraction/deletion messages.
>
> Best,
> Kurt
>
>
> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gy...@gmail.com> wrote:
>
>> Thanks Benoît!
>>
>> I can see now how I can implement this myself through the provided sink
>> interfaces but I was trying to avoid having to write code for this :D
>> My initial motivation was to see whether we are able to write out any
>> kind of table to Kafka as a simple stream of "upserts".
>>
>> I also don't completely understand at this point why I can write the
>> result of a group, tumble window aggregate to Kafka and not this window
>> join / aggregate.
>>
>> Cheers,
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>> benoit.paris@centraliens-lille.org> wrote:
>>
>>> Hi Gyula,
>>>
>>> I'm afraid conversion to see the retractions vs inserts can't be done in
>>> pure SQL (though I'd love that feature).
>>>
>>> You might want to go lower level and implement a RetractStreamTableSink
>>> [1][2] that you would wrap around a KafkaTableSink [3]. This will give you
>>> a emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);, in which the
>>> Boolean flag will give you an 'accumulate' or 'retract' signal.
>>> You can then filter the DataStream accordingly before passing to the
>>> KafkaTableSink.
>>>
>>> Hope this helps.
>>>
>>> Best regards
>>> Benoît
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>>
>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gy...@gmail.com> wrote:
>>>
>>>> Hi Roman,
>>>>
>>>> This is the core logic:
>>>>
>>>> CREATE TABLE QueryResult (
>>>> queryId    BIGINT,
>>>>   itemId    STRING,
>>>>   quantity INT
>>>> ) WITH (
>>>> 'connector.type'     = 'kafka',
>>>> 'connector.version' = 'universal',
>>>> 'connector.topic'   = 'query.output.log.1',
>>>> 'connector.properties.bootstrap.servers' = '<broker>',
>>>> 'format.type' = 'json'
>>>> );
>>>>
>>>> INSERT INTO QueryResult
>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>>> FROM
>>>>   ItemTransactions AS t,
>>>>   Queries AS q
>>>> WHERE
>>>>   t.itemId = q.itemId AND
>>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>>> q.event_time
>>>> GROUP BY
>>>>   t.itemId, q.event_time, q.queryId;
>>>>
>>>> And the error I get is:
>>>> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid
>>>> SQL update statement.
>>>> at
>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>>>> at
>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>>>> at
>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>>>> at
>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>>>> at
>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>>>> at java.util.Optional.ifPresent(Optional.java:159)
>>>> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>>>> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>>>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>>>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>>>> Caused by: org.apache.flink.table.api.TableException:
>>>> AppendStreamTableSink requires that Table has only insert changes.
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>
>>>> I am wondering what could I do to just simply pump the result updates
>>>> to Kafka here.
>>>>
>>>> Gyula
>>>>
>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>>>> khachatryan.roman@gmail.com> wrote:
>>>>
>>>>> Hi Gyula,
>>>>>
>>>>> Could you provide the code of your Flink program, the error with
>>>>> stacktrace and the Flink version?
>>>>>
>>>>> Thanks.,
>>>>> Roman
>>>>>
>>>>>
>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gy...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All!
>>>>>>
>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL API and I
>>>>>> am trying to play around with it implementing and running a few use-cases.
>>>>>>
>>>>>> I have a simple window join + aggregation, grouped on some id that I
>>>>>> want to write to Kafka but I am hitting the following error:
>>>>>>
>>>>>> "AppendStreamTableSink requires that Table has only insert changes."
>>>>>>
>>>>>> If I understand correctly the problem here is that since updates are
>>>>>> possible within a single group, we have a retract stream and the Kafka Sink
>>>>>> cannot handle that. I tried to search for the solution but I haven't found
>>>>>> any satisfying answers.
>>>>>>
>>>>>> How can I simply tell the INSERT logic to ignore previous values and
>>>>>> just always keep sending the latest (like you would see it on the CLI
>>>>>> output).
>>>>>>
>>>>>> Thank you!
>>>>>> Gyula
>>>>>>
>>>>>
>>>
>>> --
>>> Benoît Paris
>>> Ingénieur Machine Learning Explicable
>>> Tél : +33 6 60 74 23 00
>>> http://benoit.paris
>>> http://explicable.ml
>>>
>>

Re: Writing retract streams to Kafka

Posted by Kurt Young <yk...@gmail.com>.
> I also don't completely understand at this point why I can write the
result of a group, tumble window aggregate to Kafka and not this window
join / aggregate.

If you are doing a tumble window aggregate with watermark enabled, Flink
will only fire a final result for
each window at once, no modification or retractions will happen after a
window is calculated and fired.
But with some other arbitrary aggregations, there is not enough information
for Flink to determine whether
the data is complete or not, so the framework will keep calculating results
when receiving new records and
retract earlier results by firing retraction/deletion messages.

Best,
Kurt


On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gy...@gmail.com> wrote:

> Thanks Benoît!
>
> I can see now how I can implement this myself through the provided sink
> interfaces but I was trying to avoid having to write code for this :D
> My initial motivation was to see whether we are able to write out any kind
> of table to Kafka as a simple stream of "upserts".
>
> I also don't completely understand at this point why I can write the
> result of a group, tumble window aggregate to Kafka and not this window
> join / aggregate.
>
> Cheers,
> Gyula
>
> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
> benoit.paris@centraliens-lille.org> wrote:
>
>> Hi Gyula,
>>
>> I'm afraid conversion to see the retractions vs inserts can't be done in
>> pure SQL (though I'd love that feature).
>>
>> You might want to go lower level and implement a RetractStreamTableSink
>> [1][2] that you would wrap around a KafkaTableSink [3]. This will give you
>> a emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);, in which the
>> Boolean flag will give you an 'accumulate' or 'retract' signal.
>> You can then filter the DataStream accordingly before passing to the
>> KafkaTableSink.
>>
>> Hope this helps.
>>
>> Best regards
>> Benoît
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>
>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gy...@gmail.com> wrote:
>>
>>> Hi Roman,
>>>
>>> This is the core logic:
>>>
>>> CREATE TABLE QueryResult (
>>> queryId    BIGINT,
>>>   itemId    STRING,
>>>   quantity INT
>>> ) WITH (
>>> 'connector.type'     = 'kafka',
>>> 'connector.version' = 'universal',
>>> 'connector.topic'   = 'query.output.log.1',
>>> 'connector.properties.bootstrap.servers' = '<broker>',
>>> 'format.type' = 'json'
>>> );
>>>
>>> INSERT INTO QueryResult
>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>> FROM
>>>   ItemTransactions AS t,
>>>   Queries AS q
>>> WHERE
>>>   t.itemId = q.itemId AND
>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>> q.event_time
>>> GROUP BY
>>>   t.itemId, q.event_time, q.queryId;
>>>
>>> And the error I get is:
>>> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL
>>> update statement.
>>> at
>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>>> at
>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>>> at
>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>>> at
>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>>> at
>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>>> at java.util.Optional.ifPresent(Optional.java:159)
>>> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>>> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>>> Caused by: org.apache.flink.table.api.TableException:
>>> AppendStreamTableSink requires that Table has only insert changes.
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>
>>> I am wondering what could I do to just simply pump the result updates to
>>> Kafka here.
>>>
>>> Gyula
>>>
>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>>> khachatryan.roman@gmail.com> wrote:
>>>
>>>> Hi Gyula,
>>>>
>>>> Could you provide the code of your Flink program, the error with
>>>> stacktrace and the Flink version?
>>>>
>>>> Thanks.,
>>>> Roman
>>>>
>>>>
>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gy...@gmail.com> wrote:
>>>>
>>>>> Hi All!
>>>>>
>>>>> Excuse my stupid question, I am pretty new to the Table/SQL API and I
>>>>> am trying to play around with it implementing and running a few use-cases.
>>>>>
>>>>> I have a simple window join + aggregation, grouped on some id that I
>>>>> want to write to Kafka but I am hitting the following error:
>>>>>
>>>>> "AppendStreamTableSink requires that Table has only insert changes."
>>>>>
>>>>> If I understand correctly the problem here is that since updates are
>>>>> possible within a single group, we have a retract stream and the Kafka Sink
>>>>> cannot handle that. I tried to search for the solution but I haven't found
>>>>> any satisfying answers.
>>>>>
>>>>> How can I simply tell the INSERT logic to ignore previous values and
>>>>> just always keep sending the latest (like you would see it on the CLI
>>>>> output).
>>>>>
>>>>> Thank you!
>>>>> Gyula
>>>>>
>>>>
>>
>> --
>> Benoît Paris
>> Ingénieur Machine Learning Explicable
>> Tél : +33 6 60 74 23 00
>> http://benoit.paris
>> http://explicable.ml
>>
>

Re: Writing retract streams to Kafka

Posted by Gyula Fóra <gy...@gmail.com>.
Thanks Benoît!

I can see now how I can implement this myself through the provided sink
interfaces but I was trying to avoid having to write code for this :D
My initial motivation was to see whether we are able to write out any kind
of table to Kafka as a simple stream of "upserts".

I also don't completely understand at this point why I can write the result
of a group, tumble window aggregate to Kafka and not this window join /
aggregate.

Cheers,
Gyula

On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
benoit.paris@centraliens-lille.org> wrote:

> Hi Gyula,
>
> I'm afraid conversion to see the retractions vs inserts can't be done in
> pure SQL (though I'd love that feature).
>
> You might want to go lower level and implement a RetractStreamTableSink
> [1][2] that you would wrap around a KafkaTableSink [3]. This will give you
> a emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);, in which the
> Boolean flag will give you an 'accumulate' or 'retract' signal.
> You can then filter the DataStream accordingly before passing to the
> KafkaTableSink.
>
> Hope this helps.
>
> Best regards
> Benoît
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>
> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gy...@gmail.com> wrote:
>
>> Hi Roman,
>>
>> This is the core logic:
>>
>> CREATE TABLE QueryResult (
>> queryId    BIGINT,
>>   itemId    STRING,
>>   quantity INT
>> ) WITH (
>> 'connector.type'     = 'kafka',
>> 'connector.version' = 'universal',
>> 'connector.topic'   = 'query.output.log.1',
>> 'connector.properties.bootstrap.servers' = '<broker>',
>> 'format.type' = 'json'
>> );
>>
>> INSERT INTO QueryResult
>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>> FROM
>>   ItemTransactions AS t,
>>   Queries AS q
>> WHERE
>>   t.itemId = q.itemId AND
>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
>> GROUP BY
>>   t.itemId, q.event_time, q.queryId;
>>
>> And the error I get is:
>> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL
>> update statement.
>> at
>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>> at
>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>> at
>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>> at
>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>> at
>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>> at java.util.Optional.ifPresent(Optional.java:159)
>> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>> Caused by: org.apache.flink.table.api.TableException:
>> AppendStreamTableSink requires that Table has only insert changes.
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>
>> I am wondering what could I do to just simply pump the result updates to
>> Kafka here.
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>> khachatryan.roman@gmail.com> wrote:
>>
>>> Hi Gyula,
>>>
>>> Could you provide the code of your Flink program, the error with
>>> stacktrace and the Flink version?
>>>
>>> Thanks.,
>>> Roman
>>>
>>>
>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gy...@gmail.com> wrote:
>>>
>>>> Hi All!
>>>>
>>>> Excuse my stupid question, I am pretty new to the Table/SQL API and I
>>>> am trying to play around with it implementing and running a few use-cases.
>>>>
>>>> I have a simple window join + aggregation, grouped on some id that I
>>>> want to write to Kafka but I am hitting the following error:
>>>>
>>>> "AppendStreamTableSink requires that Table has only insert changes."
>>>>
>>>> If I understand correctly the problem here is that since updates are
>>>> possible within a single group, we have a retract stream and the Kafka Sink
>>>> cannot handle that. I tried to search for the solution but I haven't found
>>>> any satisfying answers.
>>>>
>>>> How can I simply tell the INSERT logic to ignore previous values and
>>>> just always keep sending the latest (like you would see it on the CLI
>>>> output).
>>>>
>>>> Thank you!
>>>> Gyula
>>>>
>>>
>
> --
> Benoît Paris
> Ingénieur Machine Learning Explicable
> Tél : +33 6 60 74 23 00
> http://benoit.paris
> http://explicable.ml
>

Re: Writing retract streams to Kafka

Posted by Benoît Paris <be...@centraliens-lille.org>.
Hi Gyula,

I'm afraid conversion to see the retractions vs inserts can't be done in
pure SQL (though I'd love that feature).

You might want to go lower level and implement a RetractStreamTableSink
[1][2] that you would wrap around a KafkaTableSink [3]. This will give you
a emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);, in which the
Boolean flag will give you an 'accumulate' or 'retract' signal.
You can then filter the DataStream accordingly before passing to the
KafkaTableSink.

Hope this helps.

Best regards
Benoît

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html

On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gy...@gmail.com> wrote:

> Hi Roman,
>
> This is the core logic:
>
> CREATE TABLE QueryResult (
> queryId    BIGINT,
>   itemId    STRING,
>   quantity INT
> ) WITH (
> 'connector.type'     = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic'   = 'query.output.log.1',
> 'connector.properties.bootstrap.servers' = '<broker>',
> 'format.type' = 'json'
> );
>
> INSERT INTO QueryResult
> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
> FROM
>   ItemTransactions AS t,
>   Queries AS q
> WHERE
>   t.itemId = q.itemId AND
>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
> GROUP BY
>   t.itemId, q.event_time, q.queryId;
>
> And the error I get is:
> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL
> update statement.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
> at
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
> at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
> at java.util.Optional.ifPresent(Optional.java:159)
> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
> Caused by: org.apache.flink.table.api.TableException:
> AppendStreamTableSink requires that Table has only insert changes.
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>
> I am wondering what could I do to just simply pump the result updates to
> Kafka here.
>
> Gyula
>
> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
> khachatryan.roman@gmail.com> wrote:
>
>> Hi Gyula,
>>
>> Could you provide the code of your Flink program, the error with
>> stacktrace and the Flink version?
>>
>> Thanks.,
>> Roman
>>
>>
>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gy...@gmail.com> wrote:
>>
>>> Hi All!
>>>
>>> Excuse my stupid question, I am pretty new to the Table/SQL API and I am
>>> trying to play around with it implementing and running a few use-cases.
>>>
>>> I have a simple window join + aggregation, grouped on some id that I
>>> want to write to Kafka but I am hitting the following error:
>>>
>>> "AppendStreamTableSink requires that Table has only insert changes."
>>>
>>> If I understand correctly the problem here is that since updates are
>>> possible within a single group, we have a retract stream and the Kafka Sink
>>> cannot handle that. I tried to search for the solution but I haven't found
>>> any satisfying answers.
>>>
>>> How can I simply tell the INSERT logic to ignore previous values and
>>> just always keep sending the latest (like you would see it on the CLI
>>> output).
>>>
>>> Thank you!
>>> Gyula
>>>
>>

-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml

Re: Writing retract streams to Kafka

Posted by Gyula Fóra <gy...@gmail.com>.
Hi Roman,

This is the core logic:

CREATE TABLE QueryResult (
queryId    BIGINT,
  itemId    STRING,
  quantity INT
) WITH (
'connector.type'     = 'kafka',
'connector.version' = 'universal',
'connector.topic'   = 'query.output.log.1',
'connector.properties.bootstrap.servers' = '<broker>',
'format.type' = 'json'
);

INSERT INTO QueryResult
SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
FROM
  ItemTransactions AS t,
  Queries AS q
WHERE
  t.itemId = q.itemId AND
  t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
GROUP BY
  t.itemId, q.event_time, q.queryId;

And the error I get is:
org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL
update statement.
at
org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
at
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
at
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
at
org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
at
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
Caused by: org.apache.flink.table.api.TableException: AppendStreamTableSink
requires that Table has only insert changes.
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)

I am wondering what could I do to just simply pump the result updates to
Kafka here.

Gyula

On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
khachatryan.roman@gmail.com> wrote:

> Hi Gyula,
>
> Could you provide the code of your Flink program, the error with
> stacktrace and the Flink version?
>
> Thanks.,
> Roman
>
>
> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gy...@gmail.com> wrote:
>
>> Hi All!
>>
>> Excuse my stupid question, I am pretty new to the Table/SQL API and I am
>> trying to play around with it implementing and running a few use-cases.
>>
>> I have a simple window join + aggregation, grouped on some id that I want
>> to write to Kafka but I am hitting the following error:
>>
>> "AppendStreamTableSink requires that Table has only insert changes."
>>
>> If I understand correctly the problem here is that since updates are
>> possible within a single group, we have a retract stream and the Kafka Sink
>> cannot handle that. I tried to search for the solution but I haven't found
>> any satisfying answers.
>>
>> How can I simply tell the INSERT logic to ignore previous values and just
>> always keep sending the latest (like you would see it on the CLI output).
>>
>> Thank you!
>> Gyula
>>
>

Re: Writing retract streams to Kafka

Posted by Khachatryan Roman <kh...@gmail.com>.
Hi Gyula,

Could you provide the code of your Flink program, the error with stacktrace
and the Flink version?

Thanks.,
Roman


On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gy...@gmail.com> wrote:

> Hi All!
>
> Excuse my stupid question, I am pretty new to the Table/SQL API and I am
> trying to play around with it implementing and running a few use-cases.
>
> I have a simple window join + aggregation, grouped on some id that I want
> to write to Kafka but I am hitting the following error:
>
> "AppendStreamTableSink requires that Table has only insert changes."
>
> If I understand correctly the problem here is that since updates are
> possible within a single group, we have a retract stream and the Kafka Sink
> cannot handle that. I tried to search for the solution but I haven't found
> any satisfying answers.
>
> How can I simply tell the INSERT logic to ignore previous values and just
> always keep sending the latest (like you would see it on the CLI output).
>
> Thank you!
> Gyula
>