You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Satyam Shekhar <sa...@gmail.com> on 2020/06/05 10:28:24 UTC

UpsertStreamTableSink for Aggregate Only Query

Hello,

I am using Flink as the query engine to build an alerting/monitoring
application. One of the use cases in our product requires continuously
tracking and charting the output of an aggregate only SQL query,
for example, select sum(revenue) from lineorder. A desirable property from
the output of Flink job for such a query is that there is always exactly 1
row in the result set (or that the number of rows does not fall to 0 due to
retractions for previous output).  In other words, I need upsert "like"
semantics for the output of the query.

I was hopeful after reading comments in UpsertStreamTableSink.java that
this condition is accounted for in the implementation, however, a pipeline
with above query writing to a concrete UpsertStreamTableSink fails with the
following error  - "UpsertStreamTableSink requires that Table has" + " a
full primary keys if it is updated." Here are the relevant comments from
UpsertStreamTableSink.java for reference -

```
Configures the unique key fields of the {@link Table} to write. The method
is called after {@link TableSink#configure(String[], TypeInformation[])}.

<p>The keys array might be empty, if the table consists of a single
(updated) record. If the table does not have a key and is append-only, the
keys attribute is null.

@param keys the field names of the table's keys, an empty array if the
table has a single row, and null if the table is append-only and has no key.
void setKeyFields(String[] keys);
```

The code in StreamExec(Legacy)Sink.scala appears to conform to observed
failure and does not match the comment about "empty key array if the table
consists of a single record".

 With that context, I have the following questions -

1. Is the UpsertStreamTableSink expected to consume the output of such
aggregate only queries? Or is my interpretation of the code and comment
wrong and I have misconfigured UpsertStreamTableSink?
2. If the answer to (1) is no, are there any recommended patterns for
solving this use-case such that the client never observes an empty result
set for the output of this query?

Regards,
Satyam

Re: UpsertStreamTableSink for Aggregate Only Query

Posted by Jark Wu <im...@gmail.com>.
Hi Satyam,

I guess your destination database table doesn't have a primary key, right?
If this is the case, I think maybe the upcoming 1.11 release with new sink
interface (FLIP-95) can better resolve this.

In the new sink interface:
- the primary key is always defined on Flink SQL DDL
- the planner will not infer or validate the primary key of the query
anymore.
- you can get either the query contains UPDATE/DELETE changes or is an
INSERT only query vis the parameter of
`DynamicTableSink#getChangelogMode(queryChangeMode)`

So if the `queryChangeMode` contains UPDATE changes, and DDL doesn't have
any PK, you can set a flag in your sink to indicate it should work in
"remove-insert" mode.

Best,
Jark

On Mon, 8 Jun 2020 at 15:40, Satyam Shekhar <sa...@gmail.com> wrote:

> Hi Jark,
>
> I wish to atomically update the destination with remove-insert. To pick
> that strategy, I need some "hint" from Flink that the output is a global
> aggregation with no grouping key, and that appends should overwrite the
> previous value.
>
> I am also exploring handling the issue in the upstream server (in query
> generation layer) where I have this knowledge based on the context (similar
> to what Arvid suggested). I may be able to get around this problem by
> handling it upstream.
>
> Regards,
> Satyam
>
> On Sun, Jun 7, 2020 at 8:05 PM Jark Wu <im...@gmail.com> wrote:
>
>> Hi Satyam,
>>
>> Currently, `UpsertStreamTableSink` requires the query to contain a
>> primary key, and the key will be set to
>> `UpsertStreamTableSink#setKeyFields`.
>> If there is no primary key in the query, an error will be thrown as you
>> can see.
>>
>> It should work for all the group by queries (if no projection on the
>> group by after the aggregation).
>> Global aggregation is special, it doesn't have a primary key. But an
>> upsert sink requires a primary key, otherwise it doesn't know which row to
>> update.
>> How would you write such a result into an external database if no primary
>> key? Will you write them in append fashion, or remove-insert fashion?
>>
>> Best,
>> Jark
>>
>>
>> On Sat, 6 Jun 2020 at 04:32, Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Instead of changing the query, I used to embed the query in a larger
>>> context for similar works.
>>>
>>> So if you get an arbitrary query X which produces exactly one result
>>> (e.g. X = select sum(revenue) from lineorder group by 1) then you can
>>> craft a query where you add a dummy pk to the result.
>>>
>>> Table original = env.sqlQuery(X);
>>> Table withDummy = original.select("'dummy' as pk, *');
>>>
>>> On Fri, Jun 5, 2020 at 9:59 PM Satyam Shekhar <sa...@gmail.com>
>>> wrote:
>>>
>>>> Hey Arvid,
>>>>
>>>> Thanks for the reply.
>>>>
>>>> As you suggested, rewriting the query to add a dummy output and group
>>>> by the clause - "select 1, sum(revenue) from lineorder group by 1"
>>>> does add a unique key column to the output, and the pipeline succeeds.
>>>>
>>>> However, the application may get arbitrary SQL from the upstream
>>>> server. This makes the solution tricky - I'd have to change the query to
>>>> add dummy grouping key for all grouping nodes in the query and projection
>>>> node to drop the dummy key. I can try to account for this upstream (in
>>>> query generation layer) but it would prefer to have it solved within the
>>>> execution engine itself.
>>>>
>>>> Regards,
>>>> Satyam
>>>>
>>>> On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise <ar...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Satyam,
>>>>>
>>>>> you are right, there seems to be a disconnect between javadoc and
>>>>> implementation. Jark probably knows more.
>>>>>
>>>>> In your case, couldn't you just add a dummy column containing a
>>>>> constant key?
>>>>>
>>>>> select 'revenue' AS name, sum(revenue) from lineorder
>>>>>
>>>>> and then set the dummy field as PK?
>>>>>
>>>>> On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <
>>>>> satyamshekhar@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I am using Flink as the query engine to build an alerting/monitoring
>>>>>> application. One of the use cases in our product requires continuously
>>>>>> tracking and charting the output of an aggregate only SQL query,
>>>>>> for example, select sum(revenue) from lineorder. A desirable
>>>>>> property from the output of Flink job for such a query is that there is
>>>>>> always exactly 1 row in the result set (or that the number of rows does not
>>>>>> fall to 0 due to retractions for previous output).  In other words, I need
>>>>>> upsert "like" semantics for the output of the query.
>>>>>>
>>>>>> I was hopeful after reading comments in UpsertStreamTableSink.java
>>>>>> that this condition is accounted for in the implementation, however, a
>>>>>> pipeline with above query writing to a concrete UpsertStreamTableSink fails
>>>>>> with the following error  - "UpsertStreamTableSink requires that
>>>>>> Table has" + " a full primary keys if it is updated." Here are the
>>>>>> relevant comments from UpsertStreamTableSink.java for reference -
>>>>>>
>>>>>> ```
>>>>>> Configures the unique key fields of the {@link Table} to write. The
>>>>>> method is called after {@link TableSink#configure(String[],
>>>>>> TypeInformation[])}.
>>>>>>
>>>>>> <p>The keys array might be empty, if the table consists of a single
>>>>>> (updated) record. If the table does not have a key and is append-only, the
>>>>>> keys attribute is null.
>>>>>>
>>>>>> @param keys the field names of the table's keys, an empty array if
>>>>>> the table has a single row, and null if the table is append-only and has no
>>>>>> key.
>>>>>> void setKeyFields(String[] keys);
>>>>>> ```
>>>>>>
>>>>>> The code in StreamExec(Legacy)Sink.scala appears to conform to
>>>>>> observed failure and does not match the comment about "empty key array if
>>>>>> the table consists of a single record".
>>>>>>
>>>>>>  With that context, I have the following questions -
>>>>>>
>>>>>> 1. Is the UpsertStreamTableSink expected to consume the output of
>>>>>> such aggregate only queries? Or is my interpretation of the code and
>>>>>> comment wrong and I have misconfigured UpsertStreamTableSink?
>>>>>> 2. If the answer to (1) is no, are there any recommended patterns for
>>>>>> solving this use-case such that the client never observes an empty result
>>>>>> set for the output of this query?
>>>>>>
>>>>>> Regards,
>>>>>> Satyam
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Arvid Heise | Senior Java Developer
>>>>>
>>>>> <https://www.ververica.com/>
>>>>>
>>>>> Follow us @VervericaData
>>>>>
>>>>> --
>>>>>
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>>
>>>>> Stream Processing | Event Driven | Real Time
>>>>>
>>>>> --
>>>>>
>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>
>>>>> --
>>>>> Ververica GmbH
>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>> Ji (Toni) Cheng
>>>>>
>>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>

Re: UpsertStreamTableSink for Aggregate Only Query

Posted by Satyam Shekhar <sa...@gmail.com>.
Hi Jark,

I wish to atomically update the destination with remove-insert. To pick
that strategy, I need some "hint" from Flink that the output is a global
aggregation with no grouping key, and that appends should overwrite the
previous value.

I am also exploring handling the issue in the upstream server (in query
generation layer) where I have this knowledge based on the context (similar
to what Arvid suggested). I may be able to get around this problem by
handling it upstream.

Regards,
Satyam

On Sun, Jun 7, 2020 at 8:05 PM Jark Wu <im...@gmail.com> wrote:

> Hi Satyam,
>
> Currently, `UpsertStreamTableSink` requires the query to contain a primary
> key, and the key will be set to `UpsertStreamTableSink#setKeyFields`.
> If there is no primary key in the query, an error will be thrown as you
> can see.
>
> It should work for all the group by queries (if no projection on the group
> by after the aggregation).
> Global aggregation is special, it doesn't have a primary key. But an
> upsert sink requires a primary key, otherwise it doesn't know which row to
> update.
> How would you write such a result into an external database if no primary
> key? Will you write them in append fashion, or remove-insert fashion?
>
> Best,
> Jark
>
>
> On Sat, 6 Jun 2020 at 04:32, Arvid Heise <ar...@ververica.com> wrote:
>
>> Instead of changing the query, I used to embed the query in a larger
>> context for similar works.
>>
>> So if you get an arbitrary query X which produces exactly one result
>> (e.g. X = select sum(revenue) from lineorder group by 1) then you can
>> craft a query where you add a dummy pk to the result.
>>
>> Table original = env.sqlQuery(X);
>> Table withDummy = original.select("'dummy' as pk, *');
>>
>> On Fri, Jun 5, 2020 at 9:59 PM Satyam Shekhar <sa...@gmail.com>
>> wrote:
>>
>>> Hey Arvid,
>>>
>>> Thanks for the reply.
>>>
>>> As you suggested, rewriting the query to add a dummy output and group by
>>> the clause - "select 1, sum(revenue) from lineorder group by 1" does
>>> add a unique key column to the output, and the pipeline succeeds.
>>>
>>> However, the application may get arbitrary SQL from the upstream server.
>>> This makes the solution tricky - I'd have to change the query to add dummy
>>> grouping key for all grouping nodes in the query and projection node to
>>> drop the dummy key. I can try to account for this upstream (in query
>>> generation layer) but it would prefer to have it solved within the
>>> execution engine itself.
>>>
>>> Regards,
>>> Satyam
>>>
>>> On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise <ar...@ververica.com> wrote:
>>>
>>>> Hi Satyam,
>>>>
>>>> you are right, there seems to be a disconnect between javadoc and
>>>> implementation. Jark probably knows more.
>>>>
>>>> In your case, couldn't you just add a dummy column containing a
>>>> constant key?
>>>>
>>>> select 'revenue' AS name, sum(revenue) from lineorder
>>>>
>>>> and then set the dummy field as PK?
>>>>
>>>> On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <sa...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I am using Flink as the query engine to build an alerting/monitoring
>>>>> application. One of the use cases in our product requires continuously
>>>>> tracking and charting the output of an aggregate only SQL query,
>>>>> for example, select sum(revenue) from lineorder. A desirable property
>>>>> from the output of Flink job for such a query is that there is always
>>>>> exactly 1 row in the result set (or that the number of rows does not fall
>>>>> to 0 due to retractions for previous output).  In other words, I need
>>>>> upsert "like" semantics for the output of the query.
>>>>>
>>>>> I was hopeful after reading comments in UpsertStreamTableSink.java
>>>>> that this condition is accounted for in the implementation, however, a
>>>>> pipeline with above query writing to a concrete UpsertStreamTableSink fails
>>>>> with the following error  - "UpsertStreamTableSink requires that
>>>>> Table has" + " a full primary keys if it is updated." Here are the
>>>>> relevant comments from UpsertStreamTableSink.java for reference -
>>>>>
>>>>> ```
>>>>> Configures the unique key fields of the {@link Table} to write. The
>>>>> method is called after {@link TableSink#configure(String[],
>>>>> TypeInformation[])}.
>>>>>
>>>>> <p>The keys array might be empty, if the table consists of a single
>>>>> (updated) record. If the table does not have a key and is append-only, the
>>>>> keys attribute is null.
>>>>>
>>>>> @param keys the field names of the table's keys, an empty array if the
>>>>> table has a single row, and null if the table is append-only and has no key.
>>>>> void setKeyFields(String[] keys);
>>>>> ```
>>>>>
>>>>> The code in StreamExec(Legacy)Sink.scala appears to conform to
>>>>> observed failure and does not match the comment about "empty key array if
>>>>> the table consists of a single record".
>>>>>
>>>>>  With that context, I have the following questions -
>>>>>
>>>>> 1. Is the UpsertStreamTableSink expected to consume the output of such
>>>>> aggregate only queries? Or is my interpretation of the code and comment
>>>>> wrong and I have misconfigured UpsertStreamTableSink?
>>>>> 2. If the answer to (1) is no, are there any recommended patterns for
>>>>> solving this use-case such that the client never observes an empty result
>>>>> set for the output of this query?
>>>>>
>>>>> Regards,
>>>>> Satyam
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Arvid Heise | Senior Java Developer
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Ververica GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>>> (Toni) Cheng
>>>>
>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

Re: UpsertStreamTableSink for Aggregate Only Query

Posted by Jark Wu <im...@gmail.com>.
Hi Satyam,

Currently, `UpsertStreamTableSink` requires the query to contain a primary
key, and the key will be set to `UpsertStreamTableSink#setKeyFields`.
If there is no primary key in the query, an error will be thrown as you can
see.

It should work for all the group by queries (if no projection on the group
by after the aggregation).
Global aggregation is special, it doesn't have a primary key. But an upsert
sink requires a primary key, otherwise it doesn't know which row to update.
How would you write such a result into an external database if no primary
key? Will you write them in append fashion, or remove-insert fashion?

Best,
Jark


On Sat, 6 Jun 2020 at 04:32, Arvid Heise <ar...@ververica.com> wrote:

> Instead of changing the query, I used to embed the query in a larger
> context for similar works.
>
> So if you get an arbitrary query X which produces exactly one result (e.g.
> X = select sum(revenue) from lineorder group by 1) then you can craft a
> query where you add a dummy pk to the result.
>
> Table original = env.sqlQuery(X);
> Table withDummy = original.select("'dummy' as pk, *');
>
> On Fri, Jun 5, 2020 at 9:59 PM Satyam Shekhar <sa...@gmail.com>
> wrote:
>
>> Hey Arvid,
>>
>> Thanks for the reply.
>>
>> As you suggested, rewriting the query to add a dummy output and group by
>> the clause - "select 1, sum(revenue) from lineorder group by 1" does add
>> a unique key column to the output, and the pipeline succeeds.
>>
>> However, the application may get arbitrary SQL from the upstream server.
>> This makes the solution tricky - I'd have to change the query to add dummy
>> grouping key for all grouping nodes in the query and projection node to
>> drop the dummy key. I can try to account for this upstream (in query
>> generation layer) but it would prefer to have it solved within the
>> execution engine itself.
>>
>> Regards,
>> Satyam
>>
>> On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Satyam,
>>>
>>> you are right, there seems to be a disconnect between javadoc and
>>> implementation. Jark probably knows more.
>>>
>>> In your case, couldn't you just add a dummy column containing a constant
>>> key?
>>>
>>> select 'revenue' AS name, sum(revenue) from lineorder
>>>
>>> and then set the dummy field as PK?
>>>
>>> On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <sa...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I am using Flink as the query engine to build an alerting/monitoring
>>>> application. One of the use cases in our product requires continuously
>>>> tracking and charting the output of an aggregate only SQL query,
>>>> for example, select sum(revenue) from lineorder. A desirable property
>>>> from the output of Flink job for such a query is that there is always
>>>> exactly 1 row in the result set (or that the number of rows does not fall
>>>> to 0 due to retractions for previous output).  In other words, I need
>>>> upsert "like" semantics for the output of the query.
>>>>
>>>> I was hopeful after reading comments in UpsertStreamTableSink.java that
>>>> this condition is accounted for in the implementation, however, a pipeline
>>>> with above query writing to a concrete UpsertStreamTableSink fails with the
>>>> following error  - "UpsertStreamTableSink requires that Table has" + "
>>>> a full primary keys if it is updated." Here are the relevant comments
>>>> from UpsertStreamTableSink.java for reference -
>>>>
>>>> ```
>>>> Configures the unique key fields of the {@link Table} to write. The
>>>> method is called after {@link TableSink#configure(String[],
>>>> TypeInformation[])}.
>>>>
>>>> <p>The keys array might be empty, if the table consists of a single
>>>> (updated) record. If the table does not have a key and is append-only, the
>>>> keys attribute is null.
>>>>
>>>> @param keys the field names of the table's keys, an empty array if the
>>>> table has a single row, and null if the table is append-only and has no key.
>>>> void setKeyFields(String[] keys);
>>>> ```
>>>>
>>>> The code in StreamExec(Legacy)Sink.scala appears to conform to observed
>>>> failure and does not match the comment about "empty key array if the table
>>>> consists of a single record".
>>>>
>>>>  With that context, I have the following questions -
>>>>
>>>> 1. Is the UpsertStreamTableSink expected to consume the output of such
>>>> aggregate only queries? Or is my interpretation of the code and comment
>>>> wrong and I have misconfigured UpsertStreamTableSink?
>>>> 2. If the answer to (1) is no, are there any recommended patterns for
>>>> solving this use-case such that the client never observes an empty result
>>>> set for the output of this query?
>>>>
>>>> Regards,
>>>> Satyam
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Re: UpsertStreamTableSink for Aggregate Only Query

Posted by Arvid Heise <ar...@ververica.com>.
Instead of changing the query, I used to embed the query in a larger
context for similar works.

So if you get an arbitrary query X which produces exactly one result (e.g.
X = select sum(revenue) from lineorder group by 1) then you can craft a
query where you add a dummy pk to the result.

Table original = env.sqlQuery(X);
Table withDummy = original.select("'dummy' as pk, *');

On Fri, Jun 5, 2020 at 9:59 PM Satyam Shekhar <sa...@gmail.com>
wrote:

> Hey Arvid,
>
> Thanks for the reply.
>
> As you suggested, rewriting the query to add a dummy output and group by
> the clause - "select 1, sum(revenue) from lineorder group by 1" does add
> a unique key column to the output, and the pipeline succeeds.
>
> However, the application may get arbitrary SQL from the upstream server.
> This makes the solution tricky - I'd have to change the query to add dummy
> grouping key for all grouping nodes in the query and projection node to
> drop the dummy key. I can try to account for this upstream (in query
> generation layer) but it would prefer to have it solved within the
> execution engine itself.
>
> Regards,
> Satyam
>
> On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Satyam,
>>
>> you are right, there seems to be a disconnect between javadoc and
>> implementation. Jark probably knows more.
>>
>> In your case, couldn't you just add a dummy column containing a constant
>> key?
>>
>> select 'revenue' AS name, sum(revenue) from lineorder
>>
>> and then set the dummy field as PK?
>>
>> On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <sa...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I am using Flink as the query engine to build an alerting/monitoring
>>> application. One of the use cases in our product requires continuously
>>> tracking and charting the output of an aggregate only SQL query,
>>> for example, select sum(revenue) from lineorder. A desirable property
>>> from the output of Flink job for such a query is that there is always
>>> exactly 1 row in the result set (or that the number of rows does not fall
>>> to 0 due to retractions for previous output).  In other words, I need
>>> upsert "like" semantics for the output of the query.
>>>
>>> I was hopeful after reading comments in UpsertStreamTableSink.java that
>>> this condition is accounted for in the implementation, however, a pipeline
>>> with above query writing to a concrete UpsertStreamTableSink fails with the
>>> following error  - "UpsertStreamTableSink requires that Table has" + "
>>> a full primary keys if it is updated." Here are the relevant comments
>>> from UpsertStreamTableSink.java for reference -
>>>
>>> ```
>>> Configures the unique key fields of the {@link Table} to write. The
>>> method is called after {@link TableSink#configure(String[],
>>> TypeInformation[])}.
>>>
>>> <p>The keys array might be empty, if the table consists of a single
>>> (updated) record. If the table does not have a key and is append-only, the
>>> keys attribute is null.
>>>
>>> @param keys the field names of the table's keys, an empty array if the
>>> table has a single row, and null if the table is append-only and has no key.
>>> void setKeyFields(String[] keys);
>>> ```
>>>
>>> The code in StreamExec(Legacy)Sink.scala appears to conform to observed
>>> failure and does not match the comment about "empty key array if the table
>>> consists of a single record".
>>>
>>>  With that context, I have the following questions -
>>>
>>> 1. Is the UpsertStreamTableSink expected to consume the output of such
>>> aggregate only queries? Or is my interpretation of the code and comment
>>> wrong and I have misconfigured UpsertStreamTableSink?
>>> 2. If the answer to (1) is no, are there any recommended patterns for
>>> solving this use-case such that the client never observes an empty result
>>> set for the output of this query?
>>>
>>> Regards,
>>> Satyam
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: UpsertStreamTableSink for Aggregate Only Query

Posted by Satyam Shekhar <sa...@gmail.com>.
Hey Arvid,

Thanks for the reply.

As you suggested, rewriting the query to add a dummy output and group by
the clause - "select 1, sum(revenue) from lineorder group by 1" does add a
unique key column to the output, and the pipeline succeeds.

However, the application may get arbitrary SQL from the upstream server.
This makes the solution tricky - I'd have to change the query to add dummy
grouping key for all grouping nodes in the query and projection node to
drop the dummy key. I can try to account for this upstream (in query
generation layer) but it would prefer to have it solved within the
execution engine itself.

Regards,
Satyam

On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Satyam,
>
> you are right, there seems to be a disconnect between javadoc and
> implementation. Jark probably knows more.
>
> In your case, couldn't you just add a dummy column containing a constant
> key?
>
> select 'revenue' AS name, sum(revenue) from lineorder
>
> and then set the dummy field as PK?
>
> On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <sa...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I am using Flink as the query engine to build an alerting/monitoring
>> application. One of the use cases in our product requires continuously
>> tracking and charting the output of an aggregate only SQL query,
>> for example, select sum(revenue) from lineorder. A desirable property
>> from the output of Flink job for such a query is that there is always
>> exactly 1 row in the result set (or that the number of rows does not fall
>> to 0 due to retractions for previous output).  In other words, I need
>> upsert "like" semantics for the output of the query.
>>
>> I was hopeful after reading comments in UpsertStreamTableSink.java that
>> this condition is accounted for in the implementation, however, a pipeline
>> with above query writing to a concrete UpsertStreamTableSink fails with the
>> following error  - "UpsertStreamTableSink requires that Table has" + " a
>> full primary keys if it is updated." Here are the relevant comments from
>> UpsertStreamTableSink.java for reference -
>>
>> ```
>> Configures the unique key fields of the {@link Table} to write. The
>> method is called after {@link TableSink#configure(String[],
>> TypeInformation[])}.
>>
>> <p>The keys array might be empty, if the table consists of a single
>> (updated) record. If the table does not have a key and is append-only, the
>> keys attribute is null.
>>
>> @param keys the field names of the table's keys, an empty array if the
>> table has a single row, and null if the table is append-only and has no key.
>> void setKeyFields(String[] keys);
>> ```
>>
>> The code in StreamExec(Legacy)Sink.scala appears to conform to observed
>> failure and does not match the comment about "empty key array if the table
>> consists of a single record".
>>
>>  With that context, I have the following questions -
>>
>> 1. Is the UpsertStreamTableSink expected to consume the output of such
>> aggregate only queries? Or is my interpretation of the code and comment
>> wrong and I have misconfigured UpsertStreamTableSink?
>> 2. If the answer to (1) is no, are there any recommended patterns for
>> solving this use-case such that the client never observes an empty result
>> set for the output of this query?
>>
>> Regards,
>> Satyam
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Re: UpsertStreamTableSink for Aggregate Only Query

Posted by Arvid Heise <ar...@ververica.com>.
Hi Satyam,

you are right, there seems to be a disconnect between javadoc and
implementation. Jark probably knows more.

In your case, couldn't you just add a dummy column containing a constant
key?

select 'revenue' AS name, sum(revenue) from lineorder

and then set the dummy field as PK?

On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <sa...@gmail.com>
wrote:

> Hello,
>
> I am using Flink as the query engine to build an alerting/monitoring
> application. One of the use cases in our product requires continuously
> tracking and charting the output of an aggregate only SQL query,
> for example, select sum(revenue) from lineorder. A desirable property
> from the output of Flink job for such a query is that there is always
> exactly 1 row in the result set (or that the number of rows does not fall
> to 0 due to retractions for previous output).  In other words, I need
> upsert "like" semantics for the output of the query.
>
> I was hopeful after reading comments in UpsertStreamTableSink.java that
> this condition is accounted for in the implementation, however, a pipeline
> with above query writing to a concrete UpsertStreamTableSink fails with the
> following error  - "UpsertStreamTableSink requires that Table has" + " a
> full primary keys if it is updated." Here are the relevant comments from
> UpsertStreamTableSink.java for reference -
>
> ```
> Configures the unique key fields of the {@link Table} to write. The method
> is called after {@link TableSink#configure(String[], TypeInformation[])}.
>
> <p>The keys array might be empty, if the table consists of a single
> (updated) record. If the table does not have a key and is append-only, the
> keys attribute is null.
>
> @param keys the field names of the table's keys, an empty array if the
> table has a single row, and null if the table is append-only and has no key.
> void setKeyFields(String[] keys);
> ```
>
> The code in StreamExec(Legacy)Sink.scala appears to conform to observed
> failure and does not match the comment about "empty key array if the table
> consists of a single record".
>
>  With that context, I have the following questions -
>
> 1. Is the UpsertStreamTableSink expected to consume the output of such
> aggregate only queries? Or is my interpretation of the code and comment
> wrong and I have misconfigured UpsertStreamTableSink?
> 2. If the answer to (1) is no, are there any recommended patterns for
> solving this use-case such that the client never observes an empty result
> set for the output of this query?
>
> Regards,
> Satyam
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng