You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rex Fenley <Re...@remind101.com> on 2020/11/05 19:52:54 UTC

Upsert UDFs

Hello,

I'm using the Table API to do a bunch of stateful transformations on CDC
Debezium rows and then insert final documents into Elasticsearch via the ES
connector.

I've noticed that Elasticsearch is constantly deleting and then inserting
documents as they update. Ideally, there would be no delete operation for a
row update, only for a delete. I'm using the Elasticsearch 7 SQL connector,
which I'm assuming uses `Elasticsearch7UpsertTableSink` under the hood,
which implies upserts are actually what it's capable of.

Therefore, I think it's possibly my table plan that's causing row upserts
to turn into deletes + inserts. My plan is essentially a series of Joins
and GroupBys + UDF Aggregates (aggregating arrays of data). I think,
possibly the UDF Aggs following the Joins + GroupBys are causing the
upserts to split into delete + inserts somehow. If this is correct, is it
possible to make UDFs that preserve Upserts? Or am I totally off-base with
my assumptions?

Thanks!
-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Upsert UDFs

Posted by Rex Fenley <Re...@remind101.com>.
Wow, that sounds definitively better. I'll try porting our aggregates over
to using `emitUpdateWithRetract` then. I'm assuming the Elasticsearch SQL
connector will respond appropriately.

Thanks for the help!

On Wed, Nov 18, 2020 at 7:20 AM Jark Wu <im...@gmail.com> wrote:

> Hi Rex,
>
> Sorry for the late response.
>
> Under the hood, if the UDTAF only implements `emitValue`, then the
> framework will call `emitValue` for every input record. Assuming this is
> a TopN UDTAF, the implementation of `emitValue` returns set [A, B, C] for
> input1
> and returns set [A, B, D] for input2, then the framework will send -A, -B,
> -C, +A, +B, +D after processing input2.
>
> But if the TopN UDTAF implements the `emitUpdateWithRetract`, the UDTAF
> can just send -C and +D in `emitUpdateWithRetract`,
> because the TopN known which row is updated. So it can "reduce the number
> of retracts".
>
> Best,
> Jark
>
> On Wed, 18 Nov 2020 at 14:32, Rex Fenley <Re...@remind101.com> wrote:
>
>> Hi,
>>
>> Does this seem like it would help?
>>
>> Thanks!
>>
>> On Tue, Nov 10, 2020 at 10:38 PM Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Thanks! We did give that a shot and ran into the bug that I reported
>>> here https://issues.apache.org/jira/browse/FLINK-20036 .
>>>
>>> I'm also seeing this function
>>>
>>>   public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out); // OPTIONAL
>>>
>>> and it says it's more performant in some cases vs
>>>
>>>   public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL
>>>
>>> . I'm having some trouble understanding in which cases it benefits
>>> performance and if it would help our case. Would using
>>> `emitUpdateWithRetract` instead of `emitValue` reduce the number of
>>> retracts we're seeing yet preserve the same end results, where our
>>> Elasticsearch documents stay up to date?
>>>
>>> On Sun, Nov 8, 2020 at 6:43 PM Jark Wu <im...@gmail.com> wrote:
>>>
>>>> Hi Rex,
>>>>
>>>> There is a similar question asked recently which I think is the same
>>>> reason [1] called retraction amplification.
>>>> You can try to turn on the mini-batch optimization to reduce the
>>>> retraction amplification.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> [1]:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/A-question-about-flink-sql-retreact-stream-td39216.html
>>>> [2]:
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>>>>
>>>> On Fri, 6 Nov 2020 at 03:56, Rex Fenley <Re...@remind101.com> wrote:
>>>>
>>>>> Also, just to be clear our ES connector looks like this:
>>>>>
>>>>> CREATE TABLE sink_es_groups (
>>>>> id BIGINT,
>>>>> //.. a bunch of scalar fields
>>>>> array_of_ids ARRAY<BIGINT NOT NULL>,
>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>> ) WITH (
>>>>> 'connector' = 'elasticsearch-7',
>>>>> 'hosts' = '${env:ELASTICSEARCH_HOSTS}',
>>>>> 'index' = '${env:GROUPS_ES_INDEX}',
>>>>> 'format' = 'json',
>>>>> 'sink.bulk-flush.max-actions' = '512',
>>>>> 'sink.bulk-flush.max-size' = '1mb',
>>>>> 'sink.bulk-flush.interval' = '5000',
>>>>> 'sink.bulk-flush.backoff.delay' = '1000',
>>>>> 'sink.bulk-flush.backoff.max-retries' = '4',
>>>>> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
>>>>> )
>>>>>
>>>>>
>>>>> On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <Re...@remind101.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I'm using the Table API to do a bunch of stateful transformations on
>>>>>> CDC Debezium rows and then insert final documents into Elasticsearch via
>>>>>> the ES connector.
>>>>>>
>>>>>> I've noticed that Elasticsearch is constantly deleting and then
>>>>>> inserting documents as they update. Ideally, there would be no delete
>>>>>> operation for a row update, only for a delete. I'm using the Elasticsearch
>>>>>> 7 SQL connector, which I'm assuming uses `Elasticsearch7UpsertTableSink`
>>>>>> under the hood, which implies upserts are actually what it's capable of.
>>>>>>
>>>>>> Therefore, I think it's possibly my table plan that's causing row
>>>>>> upserts to turn into deletes + inserts. My plan is essentially a series of
>>>>>> Joins and GroupBys + UDF Aggregates (aggregating arrays of data). I think,
>>>>>> possibly the UDF Aggs following the Joins + GroupBys are causing the
>>>>>> upserts to split into delete + inserts somehow. If this is correct, is it
>>>>>> possible to make UDFs that preserve Upserts? Or am I totally off-base with
>>>>>> my assumptions?
>>>>>>
>>>>>> Thanks!
>>>>>> --
>>>>>>
>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>
>>>>>>
>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>> <https://www.facebook.com/remindhq>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Upsert UDFs

Posted by Jark Wu <im...@gmail.com>.
Hi Rex,

Sorry for the late response.

Under the hood, if the UDTAF only implements `emitValue`, then the
framework will call `emitValue` for every input record. Assuming this is
a TopN UDTAF, the implementation of `emitValue` returns set [A, B, C] for
input1
and returns set [A, B, D] for input2, then the framework will send -A, -B,
-C, +A, +B, +D after processing input2.

But if the TopN UDTAF implements the `emitUpdateWithRetract`, the UDTAF can
just send -C and +D in `emitUpdateWithRetract`,
because the TopN known which row is updated. So it can "reduce the number
of retracts".

Best,
Jark

On Wed, 18 Nov 2020 at 14:32, Rex Fenley <Re...@remind101.com> wrote:

> Hi,
>
> Does this seem like it would help?
>
> Thanks!
>
> On Tue, Nov 10, 2020 at 10:38 PM Rex Fenley <Re...@remind101.com> wrote:
>
>> Thanks! We did give that a shot and ran into the bug that I reported here
>> https://issues.apache.org/jira/browse/FLINK-20036 .
>>
>> I'm also seeing this function
>>
>>   public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out); // OPTIONAL
>>
>> and it says it's more performant in some cases vs
>>
>>   public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL
>>
>> . I'm having some trouble understanding in which cases it benefits
>> performance and if it would help our case. Would using
>> `emitUpdateWithRetract` instead of `emitValue` reduce the number of
>> retracts we're seeing yet preserve the same end results, where our
>> Elasticsearch documents stay up to date?
>>
>> On Sun, Nov 8, 2020 at 6:43 PM Jark Wu <im...@gmail.com> wrote:
>>
>>> Hi Rex,
>>>
>>> There is a similar question asked recently which I think is the same
>>> reason [1] called retraction amplification.
>>> You can try to turn on the mini-batch optimization to reduce the
>>> retraction amplification.
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/A-question-about-flink-sql-retreact-stream-td39216.html
>>> [2]:
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>>>
>>> On Fri, 6 Nov 2020 at 03:56, Rex Fenley <Re...@remind101.com> wrote:
>>>
>>>> Also, just to be clear our ES connector looks like this:
>>>>
>>>> CREATE TABLE sink_es_groups (
>>>> id BIGINT,
>>>> //.. a bunch of scalar fields
>>>> array_of_ids ARRAY<BIGINT NOT NULL>,
>>>> PRIMARY KEY (id) NOT ENFORCED
>>>> ) WITH (
>>>> 'connector' = 'elasticsearch-7',
>>>> 'hosts' = '${env:ELASTICSEARCH_HOSTS}',
>>>> 'index' = '${env:GROUPS_ES_INDEX}',
>>>> 'format' = 'json',
>>>> 'sink.bulk-flush.max-actions' = '512',
>>>> 'sink.bulk-flush.max-size' = '1mb',
>>>> 'sink.bulk-flush.interval' = '5000',
>>>> 'sink.bulk-flush.backoff.delay' = '1000',
>>>> 'sink.bulk-flush.backoff.max-retries' = '4',
>>>> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
>>>> )
>>>>
>>>>
>>>> On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <Re...@remind101.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I'm using the Table API to do a bunch of stateful transformations on
>>>>> CDC Debezium rows and then insert final documents into Elasticsearch via
>>>>> the ES connector.
>>>>>
>>>>> I've noticed that Elasticsearch is constantly deleting and then
>>>>> inserting documents as they update. Ideally, there would be no delete
>>>>> operation for a row update, only for a delete. I'm using the Elasticsearch
>>>>> 7 SQL connector, which I'm assuming uses `Elasticsearch7UpsertTableSink`
>>>>> under the hood, which implies upserts are actually what it's capable of.
>>>>>
>>>>> Therefore, I think it's possibly my table plan that's causing row
>>>>> upserts to turn into deletes + inserts. My plan is essentially a series of
>>>>> Joins and GroupBys + UDF Aggregates (aggregating arrays of data). I think,
>>>>> possibly the UDF Aggs following the Joins + GroupBys are causing the
>>>>> upserts to split into delete + inserts somehow. If this is correct, is it
>>>>> possible to make UDFs that preserve Upserts? Or am I totally off-base with
>>>>> my assumptions?
>>>>>
>>>>> Thanks!
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Re: Upsert UDFs

Posted by Rex Fenley <Re...@remind101.com>.
Hi,

Does this seem like it would help?

Thanks!

On Tue, Nov 10, 2020 at 10:38 PM Rex Fenley <Re...@remind101.com> wrote:

> Thanks! We did give that a shot and ran into the bug that I reported here
> https://issues.apache.org/jira/browse/FLINK-20036 .
>
> I'm also seeing this function
>
>   public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out); // OPTIONAL
>
> and it says it's more performant in some cases vs
>
>   public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL
>
> . I'm having some trouble understanding in which cases it benefits
> performance and if it would help our case. Would using
> `emitUpdateWithRetract` instead of `emitValue` reduce the number of
> retracts we're seeing yet preserve the same end results, where our
> Elasticsearch documents stay up to date?
>
> On Sun, Nov 8, 2020 at 6:43 PM Jark Wu <im...@gmail.com> wrote:
>
>> Hi Rex,
>>
>> There is a similar question asked recently which I think is the same
>> reason [1] called retraction amplification.
>> You can try to turn on the mini-batch optimization to reduce the
>> retraction amplification.
>>
>> Best,
>> Jark
>>
>> [1]:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/A-question-about-flink-sql-retreact-stream-td39216.html
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>>
>> On Fri, 6 Nov 2020 at 03:56, Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Also, just to be clear our ES connector looks like this:
>>>
>>> CREATE TABLE sink_es_groups (
>>> id BIGINT,
>>> //.. a bunch of scalar fields
>>> array_of_ids ARRAY<BIGINT NOT NULL>,
>>> PRIMARY KEY (id) NOT ENFORCED
>>> ) WITH (
>>> 'connector' = 'elasticsearch-7',
>>> 'hosts' = '${env:ELASTICSEARCH_HOSTS}',
>>> 'index' = '${env:GROUPS_ES_INDEX}',
>>> 'format' = 'json',
>>> 'sink.bulk-flush.max-actions' = '512',
>>> 'sink.bulk-flush.max-size' = '1mb',
>>> 'sink.bulk-flush.interval' = '5000',
>>> 'sink.bulk-flush.backoff.delay' = '1000',
>>> 'sink.bulk-flush.backoff.max-retries' = '4',
>>> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
>>> )
>>>
>>>
>>> On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <Re...@remind101.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm using the Table API to do a bunch of stateful transformations on
>>>> CDC Debezium rows and then insert final documents into Elasticsearch via
>>>> the ES connector.
>>>>
>>>> I've noticed that Elasticsearch is constantly deleting and then
>>>> inserting documents as they update. Ideally, there would be no delete
>>>> operation for a row update, only for a delete. I'm using the Elasticsearch
>>>> 7 SQL connector, which I'm assuming uses `Elasticsearch7UpsertTableSink`
>>>> under the hood, which implies upserts are actually what it's capable of.
>>>>
>>>> Therefore, I think it's possibly my table plan that's causing row
>>>> upserts to turn into deletes + inserts. My plan is essentially a series of
>>>> Joins and GroupBys + UDF Aggregates (aggregating arrays of data). I think,
>>>> possibly the UDF Aggs following the Joins + GroupBys are causing the
>>>> upserts to split into delete + inserts somehow. If this is correct, is it
>>>> possible to make UDFs that preserve Upserts? Or am I totally off-base with
>>>> my assumptions?
>>>>
>>>> Thanks!
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Upsert UDFs

Posted by Rex Fenley <Re...@remind101.com>.
Thanks! We did give that a shot and ran into the bug that I reported here
https://issues.apache.org/jira/browse/FLINK-20036 .

I'm also seeing this function

  public void emitUpdateWithRetract(ACC accumulator,
RetractableCollector<T> out); // OPTIONAL

and it says it's more performant in some cases vs

  public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL

. I'm having some trouble understanding in which cases it benefits
performance and if it would help our case. Would using
`emitUpdateWithRetract` instead of `emitValue` reduce the number of
retracts we're seeing yet preserve the same end results, where our
Elasticsearch documents stay up to date?

On Sun, Nov 8, 2020 at 6:43 PM Jark Wu <im...@gmail.com> wrote:

> Hi Rex,
>
> There is a similar question asked recently which I think is the same
> reason [1] called retraction amplification.
> You can try to turn on the mini-batch optimization to reduce the
> retraction amplification.
>
> Best,
> Jark
>
> [1]:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/A-question-about-flink-sql-retreact-stream-td39216.html
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>
> On Fri, 6 Nov 2020 at 03:56, Rex Fenley <Re...@remind101.com> wrote:
>
>> Also, just to be clear our ES connector looks like this:
>>
>> CREATE TABLE sink_es_groups (
>> id BIGINT,
>> //.. a bunch of scalar fields
>> array_of_ids ARRAY<BIGINT NOT NULL>,
>> PRIMARY KEY (id) NOT ENFORCED
>> ) WITH (
>> 'connector' = 'elasticsearch-7',
>> 'hosts' = '${env:ELASTICSEARCH_HOSTS}',
>> 'index' = '${env:GROUPS_ES_INDEX}',
>> 'format' = 'json',
>> 'sink.bulk-flush.max-actions' = '512',
>> 'sink.bulk-flush.max-size' = '1mb',
>> 'sink.bulk-flush.interval' = '5000',
>> 'sink.bulk-flush.backoff.delay' = '1000',
>> 'sink.bulk-flush.backoff.max-retries' = '4',
>> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
>> )
>>
>>
>> On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Hello,
>>>
>>> I'm using the Table API to do a bunch of stateful transformations on CDC
>>> Debezium rows and then insert final documents into Elasticsearch via the ES
>>> connector.
>>>
>>> I've noticed that Elasticsearch is constantly deleting and then
>>> inserting documents as they update. Ideally, there would be no delete
>>> operation for a row update, only for a delete. I'm using the Elasticsearch
>>> 7 SQL connector, which I'm assuming uses `Elasticsearch7UpsertTableSink`
>>> under the hood, which implies upserts are actually what it's capable of.
>>>
>>> Therefore, I think it's possibly my table plan that's causing row
>>> upserts to turn into deletes + inserts. My plan is essentially a series of
>>> Joins and GroupBys + UDF Aggregates (aggregating arrays of data). I think,
>>> possibly the UDF Aggs following the Joins + GroupBys are causing the
>>> upserts to split into delete + inserts somehow. If this is correct, is it
>>> possible to make UDFs that preserve Upserts? Or am I totally off-base with
>>> my assumptions?
>>>
>>> Thanks!
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Upsert UDFs

Posted by Jark Wu <im...@gmail.com>.
Hi Rex,

There is a similar question asked recently which I think is the same reason
[1] called retraction amplification.
You can try to turn on the mini-batch optimization to reduce the retraction
amplification.

Best,
Jark

[1]:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/A-question-about-flink-sql-retreact-stream-td39216.html
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation

On Fri, 6 Nov 2020 at 03:56, Rex Fenley <Re...@remind101.com> wrote:

> Also, just to be clear our ES connector looks like this:
>
> CREATE TABLE sink_es_groups (
> id BIGINT,
> //.. a bunch of scalar fields
> array_of_ids ARRAY<BIGINT NOT NULL>,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'elasticsearch-7',
> 'hosts' = '${env:ELASTICSEARCH_HOSTS}',
> 'index' = '${env:GROUPS_ES_INDEX}',
> 'format' = 'json',
> 'sink.bulk-flush.max-actions' = '512',
> 'sink.bulk-flush.max-size' = '1mb',
> 'sink.bulk-flush.interval' = '5000',
> 'sink.bulk-flush.backoff.delay' = '1000',
> 'sink.bulk-flush.backoff.max-retries' = '4',
> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
> )
>
>
> On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <Re...@remind101.com> wrote:
>
>> Hello,
>>
>> I'm using the Table API to do a bunch of stateful transformations on CDC
>> Debezium rows and then insert final documents into Elasticsearch via the ES
>> connector.
>>
>> I've noticed that Elasticsearch is constantly deleting and then inserting
>> documents as they update. Ideally, there would be no delete operation for a
>> row update, only for a delete. I'm using the Elasticsearch 7 SQL connector,
>> which I'm assuming uses `Elasticsearch7UpsertTableSink` under the hood,
>> which implies upserts are actually what it's capable of.
>>
>> Therefore, I think it's possibly my table plan that's causing row upserts
>> to turn into deletes + inserts. My plan is essentially a series of Joins
>> and GroupBys + UDF Aggregates (aggregating arrays of data). I think,
>> possibly the UDF Aggs following the Joins + GroupBys are causing the
>> upserts to split into delete + inserts somehow. If this is correct, is it
>> possible to make UDFs that preserve Upserts? Or am I totally off-base with
>> my assumptions?
>>
>> Thanks!
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Re: Upsert UDFs

Posted by Rex Fenley <Re...@remind101.com>.
Also, just to be clear our ES connector looks like this:

CREATE TABLE sink_es_groups (
id BIGINT,
//.. a bunch of scalar fields
array_of_ids ARRAY<BIGINT NOT NULL>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = '${env:ELASTICSEARCH_HOSTS}',
'index' = '${env:GROUPS_ES_INDEX}',
'format' = 'json',
'sink.bulk-flush.max-actions' = '512',
'sink.bulk-flush.max-size' = '1mb',
'sink.bulk-flush.interval' = '5000',
'sink.bulk-flush.backoff.delay' = '1000',
'sink.bulk-flush.backoff.max-retries' = '4',
'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
)


On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <Re...@remind101.com> wrote:

> Hello,
>
> I'm using the Table API to do a bunch of stateful transformations on CDC
> Debezium rows and then insert final documents into Elasticsearch via the ES
> connector.
>
> I've noticed that Elasticsearch is constantly deleting and then inserting
> documents as they update. Ideally, there would be no delete operation for a
> row update, only for a delete. I'm using the Elasticsearch 7 SQL connector,
> which I'm assuming uses `Elasticsearch7UpsertTableSink` under the hood,
> which implies upserts are actually what it's capable of.
>
> Therefore, I think it's possibly my table plan that's causing row upserts
> to turn into deletes + inserts. My plan is essentially a series of Joins
> and GroupBys + UDF Aggregates (aggregating arrays of data). I think,
> possibly the UDF Aggs following the Joins + GroupBys are causing the
> upserts to split into delete + inserts somehow. If this is correct, is it
> possible to make UDFs that preserve Upserts? Or am I totally off-base with
> my assumptions?
>
> Thanks!
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>