You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Bowen Li <bo...@gmail.com> on 2019/07/03 19:39:14 UTC

Re: Source Kafka and Sink Hive managed tables via Flink Job

BTW,  I'm adding user@ mailing list since this is a user question and
should be asked there.

dev@ mailing list is only for discussions of Flink development. Please see
https://flink.apache.org/community.html#mailing-lists

On Wed, Jul 3, 2019 at 12:34 PM Bowen Li <bo...@gmail.com> wrote:

> Hi Youssef,
>
> You need to provide more background context:
>
> - Which Hive sink are you using? We are working on the official Hive sink
> for community and will be released in 1.9. So did you develop yours in
> house?
> - What do you mean by 1st, 2nd, 3rd window? You mean the parallel
> instances of the same operator, or do you have you have 3 windowing
> operations chained?
> - What does your Hive table look like? E.g. is it partitioned or
> non-partitioned? If partitioned, how many partitions do you have? is it
> writing in static partition or dynamic partition mode? what format? how
> large?
> - What does your sink do - is each parallelism writing to multiple
> partitions or a single partition/table? Is it only appending data or
> upserting?
>
> On Wed, Jul 3, 2019 at 1:38 AM Youssef Achbany <
> youssef.achbany@euranova.eu> wrote:
>
>> Dear all,
>>
>> I'm working for a big project and one of the challenge is to read Kafka
>> topics and copy them via Hive command into Hive managed tables in order to
>> enable ACID HIVE properties.
>>
>> I try it but I have a issue with back pressure:
>> - The first window read 20.000 events and wrote them in Hive tables
>> - The second, third, ... send only 100 events because the write in Hive
>> take more time than the read of a Kafka topic. But writing 100 events or
>> 50.000 events takes +/- the same time for Hive.
>>
>> Someone have already do this source and sink? Could you help on this?
>> Or have you some tips?
>> It seems that defining a size window on number of event instead time is
>> not
>> possible. Is it true?
>>
>> Thank you for your help
>>
>> Youssef
>>
>> --
>> ♻ Be green, keep it on the screen
>>
>

Re: Source Kafka and Sink Hive managed tables via Flink Job

Posted by Bowen Li <bo...@gmail.com>.
Thanks Youssef. The context makes more sense to me now.

Just from your description, I doubt it might be because of upsert - the
sink's throughput in step 1 is high but may stuck in step 2. AFAIK, Hive
ACID/UPSERT is not really scalable, it's ok for rare, occasional usage but
cannot scale well to massivenes.

I'd suggest you doing a few tests:
1) find out how much percentage of your data is upsert, and google how much
percentage fits a Hive ACID/upsert use case
2) try change step 2 from upsert to just append and see if the back
pressure goes away
3) make sure if it's really the sink causing the backpressure (can easily
do from Flink UI), and debug your sink's (via logging, Java remote
debugging, etc) and see where the bottleneck is

I think you can find the root cause with above steps, please report back if
the inference is valid or not so we can help more users. In case you find
that Hive ACID is not the problem, please share some high level code of
your job, so we can take another look.

Bowen


On Thu, Jul 4, 2019 at 6:50 AM Youssef Achbany <yo...@euranova.eu>
wrote:

> Thank you Li for your answer and sorry for the dev mistake :).
>
> *To be more clear:*
>
> We write multiple events, assigned via a Flink tumbling window, to Hive in
> one JDBC INSERT statement. We wrote a Hive sink function for that, using
> only JDBC. We do not use partitions yet, but the table is clustered into
> buckets stored as ORC.
>
> We run the Flink job with parallellism 1 because Hive does not support
> multiple INSERT statements in parallel.
>
> We observe that the first instance of the tumbling window easily insert
> 10ks records in Hive, but following windows only 100s, probably because
> backpressure kicks in then.
>
> In addition, we have answered your questions in our mail in yellow.
>
> Thank you
>
> Kind regards
>
>  -----Original Message-----
>
> From: Bowen Li [mailto:bowenli86@gmail.com]
>
> Sent: Wednesday, July 03, 2019 9:34 PM
>
> To: dev; youssef.achbany@euranova.eu
>
> Subject: Re: Source Kafka and Sink Hive managed tables via Flink Job
>
>  Hi Youssef,
>
>  You need to provide more background context:
>
> - Which Hive sink are you using? We are working on the official Hive sink
>
> for community and will be released in 1.9. So did you develop yours in
>
> house?
>
> JDBC
>
>  - What do you mean by 1st, 2nd, 3rd window? You mean the parallel
> instances
>
> of the same operator, or do you have you have 3 windowing operations
>
> chained?
>
> No parrell instances, I was refering tumbling window
>
>  - What does your Hive table look like? E.g. is it partitioned or
>
> non-partitioned? If partitioned, how many partitions do you have? is it
>
> writing in static partition or dynamic partition mode? what format? how
>
> large?
>
>  No partitioning done because low volumes (<100K records)
>
> Format: ORC
>
> Batches of 20K records are processed in the first windows
>
>  - What does your sink do - is each parallelism writing to multiple
>
> partitions or a single partition/table? Is it only appending data or
>
> upserting?
>
>  Single partition table, in 2 steps: (1) writing to temporary table
> (append), (2) execute SQL to upsert historical table with temporary table
>
> On Wed, 3 Jul 2019 at 21:39, Bowen Li <bo...@gmail.com> wrote:
>
>> BTW,  I'm adding user@ mailing list since this is a user question and
>> should be asked there.
>>
>> dev@ mailing list is only for discussions of Flink development. Please
>> see https://flink.apache.org/community.html#mailing-lists
>>
>> On Wed, Jul 3, 2019 at 12:34 PM Bowen Li <bo...@gmail.com> wrote:
>>
>>> Hi Youssef,
>>>
>>> You need to provide more background context:
>>>
>>> - Which Hive sink are you using? We are working on the official Hive
>>> sink for community and will be released in 1.9. So did you develop yours in
>>> house?
>>> - What do you mean by 1st, 2nd, 3rd window? You mean the parallel
>>> instances of the same operator, or do you have you have 3 windowing
>>> operations chained?
>>> - What does your Hive table look like? E.g. is it partitioned or
>>> non-partitioned? If partitioned, how many partitions do you have? is it
>>> writing in static partition or dynamic partition mode? what format? how
>>> large?
>>> - What does your sink do - is each parallelism writing to multiple
>>> partitions or a single partition/table? Is it only appending data or
>>> upserting?
>>>
>>> On Wed, Jul 3, 2019 at 1:38 AM Youssef Achbany <
>>> youssef.achbany@euranova.eu> wrote:
>>>
>>>> Dear all,
>>>>
>>>> I'm working for a big project and one of the challenge is to read Kafka
>>>> topics and copy them via Hive command into Hive managed tables in order
>>>> to
>>>> enable ACID HIVE properties.
>>>>
>>>> I try it but I have a issue with back pressure:
>>>> - The first window read 20.000 events and wrote them in Hive tables
>>>> - The second, third, ... send only 100 events because the write in Hive
>>>> take more time than the read of a Kafka topic. But writing 100 events or
>>>> 50.000 events takes +/- the same time for Hive.
>>>>
>>>> Someone have already do this source and sink? Could you help on this?
>>>> Or have you some tips?
>>>> It seems that defining a size window on number of event instead time is
>>>> not
>>>> possible. Is it true?
>>>>
>>>> Thank you for your help
>>>>
>>>> Youssef
>>>>
>>>> --
>>>> ♻ Be green, keep it on the screen
>>>>
>>>
> ♻ Be green, keep it on the screen

Re: Source Kafka and Sink Hive managed tables via Flink Job

Posted by Youssef Achbany <yo...@euranova.eu>.
Thank you Li for your answer and sorry for the dev mistake :).

*To be more clear:*

We write multiple events, assigned via a Flink tumbling window, to Hive in
one JDBC INSERT statement. We wrote a Hive sink function for that, using
only JDBC. We do not use partitions yet, but the table is clustered into
buckets stored as ORC.

We run the Flink job with parallellism 1 because Hive does not support
multiple INSERT statements in parallel.

We observe that the first instance of the tumbling window easily insert
10ks records in Hive, but following windows only 100s, probably because
backpressure kicks in then.

In addition, we have answered your questions in our mail in yellow.

Thank you

Kind regards

 -----Original Message-----

From: Bowen Li [mailto:bowenli86@gmail.com]

Sent: Wednesday, July 03, 2019 9:34 PM

To: dev; youssef.achbany@euranova.eu

Subject: Re: Source Kafka and Sink Hive managed tables via Flink Job

 Hi Youssef,

 You need to provide more background context:

- Which Hive sink are you using? We are working on the official Hive sink

for community and will be released in 1.9. So did you develop yours in

house?

JDBC

 - What do you mean by 1st, 2nd, 3rd window? You mean the parallel instances

of the same operator, or do you have you have 3 windowing operations

chained?

No parrell instances, I was refering tumbling window

 - What does your Hive table look like? E.g. is it partitioned or

non-partitioned? If partitioned, how many partitions do you have? is it

writing in static partition or dynamic partition mode? what format? how

large?

 No partitioning done because low volumes (<100K records)

Format: ORC

Batches of 20K records are processed in the first windows

 - What does your sink do - is each parallelism writing to multiple

partitions or a single partition/table? Is it only appending data or

upserting?

 Single partition table, in 2 steps: (1) writing to temporary table
(append), (2) execute SQL to upsert historical table with temporary table

On Wed, 3 Jul 2019 at 21:39, Bowen Li <bo...@gmail.com> wrote:

> BTW,  I'm adding user@ mailing list since this is a user question and
> should be asked there.
>
> dev@ mailing list is only for discussions of Flink development. Please
> see https://flink.apache.org/community.html#mailing-lists
>
> On Wed, Jul 3, 2019 at 12:34 PM Bowen Li <bo...@gmail.com> wrote:
>
>> Hi Youssef,
>>
>> You need to provide more background context:
>>
>> - Which Hive sink are you using? We are working on the official Hive sink
>> for community and will be released in 1.9. So did you develop yours in
>> house?
>> - What do you mean by 1st, 2nd, 3rd window? You mean the parallel
>> instances of the same operator, or do you have you have 3 windowing
>> operations chained?
>> - What does your Hive table look like? E.g. is it partitioned or
>> non-partitioned? If partitioned, how many partitions do you have? is it
>> writing in static partition or dynamic partition mode? what format? how
>> large?
>> - What does your sink do - is each parallelism writing to multiple
>> partitions or a single partition/table? Is it only appending data or
>> upserting?
>>
>> On Wed, Jul 3, 2019 at 1:38 AM Youssef Achbany <
>> youssef.achbany@euranova.eu> wrote:
>>
>>> Dear all,
>>>
>>> I'm working for a big project and one of the challenge is to read Kafka
>>> topics and copy them via Hive command into Hive managed tables in order
>>> to
>>> enable ACID HIVE properties.
>>>
>>> I try it but I have a issue with back pressure:
>>> - The first window read 20.000 events and wrote them in Hive tables
>>> - The second, third, ... send only 100 events because the write in Hive
>>> take more time than the read of a Kafka topic. But writing 100 events or
>>> 50.000 events takes +/- the same time for Hive.
>>>
>>> Someone have already do this source and sink? Could you help on this?
>>> Or have you some tips?
>>> It seems that defining a size window on number of event instead time is
>>> not
>>> possible. Is it true?
>>>
>>> Thank you for your help
>>>
>>> Youssef
>>>
>>> --
>>> ♻ Be green, keep it on the screen
>>>
>>

-- 
♻ Be green, keep it on the screen