You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Maximilian Bode <ma...@tngtech.com> on 2016/01/21 15:35:12 UTC

Backpressure in the context of JDBCOutputFormat update

Hi everyone,

in a Flink (0.10.1) job with two JDBCOutputFormat sinks, one of them (doing a database update) is performing slower than the other one (an insert). The job as a whole is also slow as upstream operators are slowed down due to backpressure. I am able to speed up the whole job by introducing an a priori unnecessary .distinct(), which of course blocks downstream execution of the slow sink, which in turn seems to be able to execute faster when given all data at once.

Any ideas what is going on here? Is there something I can do without introducing unnecessary computation steps?

Cheers,
Max
— 
Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com * 0176 1000 75 50
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: Backpressure in the context of JDBCOutputFormat update

Posted by Robert Metzger <rm...@apache.org>.
Hi,

have you thought about making two independent jobs out of this? (or you
call execute() for the two separate parts)
One job for the update() and one for the insert() ?

Even though the update operation should not be expensive, I think its
helpful to understand the performance impact of having concurrent insert /
updates vs executing these operations sequentially ?
Are the inserts / updates performed on the same table?





On Thu, Jan 21, 2016 at 4:17 PM, Maximilian Bode <
maximilian.bode@tngtech.com> wrote:

> Hi Robert,
> sorry, I should have been clearer in my initial mail. The two cases I was
> comparing are:
>
> 1) distinct() before Insert (which is necessary as we have a unique key
> constraint in our database), no distinct() before update
> 2) distinct() before insert AND distinct() before update
>
> The test data used actually only contains unique values for the affected
> field though, so the dataset size is not reduced in case 2.
>
> In case 1 the insert does not start until all the data has arrived at
> distinct() while the update is already going along (slowing down upstream
> operators as well). In case 2 both sinks wait for their respective
> distinct()'s (which is reached much faster now), then start roughly at the
> same time leading to a shorter net job time for job 2 as compared to 1.
>
> A pause operator might be useful, yes.
>
> The update should not be an inherently much more expensive operation, as
> the WHERE clause only contains the table's primary key.
>
> Cheers,
> Max
> —
> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com * 0176
> 1000 75 50
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
> Am 21.01.2016 um 15:57 schrieb Robert Metzger <rm...@apache.org>:
>
> Hi Max,
>
> is the distinct() operation reducing the size of the DataSet? If so, I
> assume you have an idempotent update and the job is faster because fewer
> updates are done?
> if the distinct() operator is not changing anything, then, the job might
> be faster because the INSERT is done while Flink is still executing the
> distinct() operation. So the insert is over when the updates are starting.
> This would mean that concurrent inserts and updates on the database are
> much slower than doing this sequentially.
>
> I'm wondering if there is a way in Flink to explicitly ask for spilling an
> intermediate operator to "pause" execution:
>
> Source ----- > (spill for pausing) ---> (update sink)
>         \
>          ------- > (insert)
>
> I don't have a lot of practical experience with RDBMS, but I guess updates
> are slower because an index lookup + update is necessary. Maybe optimizing
> the database configuration / schema / indexes is more promising. I think
> its indeed much nicer to avoid any unnecessary steps in Flink.
>
> Did you do any "microbenchmarks" for the update and insert part? I guess
> that would help a lot to understand the impact of certain index structures,
> batching sizes, or database drivers.
>
> Regards,
> Robert
>
>
>
>
> On Thu, Jan 21, 2016 at 3:35 PM, Maximilian Bode <
> maximilian.bode@tngtech.com> wrote:
>
>> Hi everyone,
>>
>> in a Flink (0.10.1) job with two JDBCOutputFormat sinks, one of them
>> (doing a database update) is performing slower than the other one (an
>> insert). The job as a whole is also slow as upstream operators are slowed
>> down due to backpressure. I am able to speed up the whole job by
>> introducing an a priori unnecessary .distinct(), which of course blocks
>> downstream execution of the slow sink, which in turn seems to be able to
>> execute faster when given all data at once.
>>
>> Any ideas what is going on here? Is there something I can do without
>> introducing unnecessary computation steps?
>>
>> Cheers,
>> Max
>> —
>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com * 0176
>> 1000 75 50
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>>
>
>

Re: Backpressure in the context of JDBCOutputFormat update

Posted by Maximilian Bode <ma...@tngtech.com>.
Hi Robert,
sorry, I should have been clearer in my initial mail. The two cases I was comparing are:

1) distinct() before Insert (which is necessary as we have a unique key constraint in our database), no distinct() before update
2) distinct() before insert AND distinct() before update

The test data used actually only contains unique values for the affected field though, so the dataset size is not reduced in case 2.

In case 1 the insert does not start until all the data has arrived at distinct() while the update is already going along (slowing down upstream operators as well). In case 2 both sinks wait for their respective distinct()'s (which is reached much faster now), then start roughly at the same time leading to a shorter net job time for job 2 as compared to 1.

A pause operator might be useful, yes.

The update should not be an inherently much more expensive operation, as the WHERE clause only contains the table's primary key.

Cheers,
Max
— 
Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com * 0176 1000 75 50
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 21.01.2016 um 15:57 schrieb Robert Metzger <rm...@apache.org>:
> 
> Hi Max,
> 
> is the distinct() operation reducing the size of the DataSet? If so, I assume you have an idempotent update and the job is faster because fewer updates are done?
> if the distinct() operator is not changing anything, then, the job might be faster because the INSERT is done while Flink is still executing the distinct() operation. So the insert is over when the updates are starting. This would mean that concurrent inserts and updates on the database are much slower than doing this sequentially.
> 
> I'm wondering if there is a way in Flink to explicitly ask for spilling an intermediate operator to "pause" execution:
> 
> Source ----- > (spill for pausing) ---> (update sink)
>         \
>          ------- > (insert)
> 
> I don't have a lot of practical experience with RDBMS, but I guess updates are slower because an index lookup + update is necessary. Maybe optimizing the database configuration / schema / indexes is more promising. I think its indeed much nicer to avoid any unnecessary steps in Flink.
> 
> Did you do any "microbenchmarks" for the update and insert part? I guess that would help a lot to understand the impact of certain index structures, batching sizes, or database drivers.
> 
> Regards,
> Robert
> 
> 
> 
> 
> On Thu, Jan 21, 2016 at 3:35 PM, Maximilian Bode <maximilian.bode@tngtech.com <ma...@tngtech.com>> wrote:
> Hi everyone,
> 
> in a Flink (0.10.1) job with two JDBCOutputFormat sinks, one of them (doing a database update) is performing slower than the other one (an insert). The job as a whole is also slow as upstream operators are slowed down due to backpressure. I am able to speed up the whole job by introducing an a priori unnecessary .distinct(), which of course blocks downstream execution of the slow sink, which in turn seems to be able to execute faster when given all data at once.
> 
> Any ideas what is going on here? Is there something I can do without introducing unnecessary computation steps?
> 
> Cheers,
> Max
> — 
> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com <ma...@tngtech.com> * 0176 1000 75 50 <tel:0176%201000%2075%2050>
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
> 


Re: Backpressure in the context of JDBCOutputFormat update

Posted by Robert Metzger <rm...@apache.org>.
Hi Max,

is the distinct() operation reducing the size of the DataSet? If so, I
assume you have an idempotent update and the job is faster because fewer
updates are done?
if the distinct() operator is not changing anything, then, the job might be
faster because the INSERT is done while Flink is still executing the
distinct() operation. So the insert is over when the updates are starting.
This would mean that concurrent inserts and updates on the database are
much slower than doing this sequentially.

I'm wondering if there is a way in Flink to explicitly ask for spilling an
intermediate operator to "pause" execution:

Source ----- > (spill for pausing) ---> (update sink)
        \
         ------- > (insert)

I don't have a lot of practical experience with RDBMS, but I guess updates
are slower because an index lookup + update is necessary. Maybe optimizing
the database configuration / schema / indexes is more promising. I think
its indeed much nicer to avoid any unnecessary steps in Flink.

Did you do any "microbenchmarks" for the update and insert part? I guess
that would help a lot to understand the impact of certain index structures,
batching sizes, or database drivers.

Regards,
Robert




On Thu, Jan 21, 2016 at 3:35 PM, Maximilian Bode <
maximilian.bode@tngtech.com> wrote:

> Hi everyone,
>
> in a Flink (0.10.1) job with two JDBCOutputFormat sinks, one of them
> (doing a database update) is performing slower than the other one (an
> insert). The job as a whole is also slow as upstream operators are slowed
> down due to backpressure. I am able to speed up the whole job by
> introducing an a priori unnecessary .distinct(), which of course blocks
> downstream execution of the slow sink, which in turn seems to be able to
> execute faster when given all data at once.
>
> Any ideas what is going on here? Is there something I can do without
> introducing unnecessary computation steps?
>
> Cheers,
> Max
> —
> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com * 0176
> 1000 75 50
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>