You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dominique De Vito <dd...@gmail.com> on 2020/01/21 09:01:33 UTC

about registering completion function for worker shutdown

Hi,

For a Flink batch job, some value are writing to Kafka through a Producer.

I want to register a hook for closing (at the end) the Kafka producer a
worker is using.... hook to be executed, of course, on worker side.

Is there a way to do so ?

Thanks.

Regards,
Dominique

Re: about registering completion function for worker shutdown

Posted by Robert Metzger <rm...@apache.org>.
Hi,
I now realize that you are using the batch API, and I gave you an answer
for the streaming API :(
The mapPartition function also has a close() method, which you can use to
implement the same pattern.
With a JVM Shutdown hook, you are assuming that the TaskManager is shutting
down at the end of your job. That this happens depends on the deployment
method. You might have unexpected results if you are deploying Flink in a
way that keeps the TaskManagers around.


On Tue, Feb 18, 2020 at 1:34 PM Dominique De Vito <dd...@gmail.com>
wrote:

> Hi Robert,
>
> Thanks for your hint / reply / help.
>
> So far I have not tested your way (may be next), but tried another one:
>
> * use mapPartitions
> -- at the beginning, get a KafkaProducer
> -- the KafkaProducerFactory class I use is lazy and caches the first
> instances created; so, there is reuse.
>
> * register a JVM hook for closing KafkaProducer.
>
> So far I have met some perf issue, but I don't know yet it's due to my
> pattern, or something else.
>
> Anyway, thanks.
>
> Regards,
> Dominique
>
>
> Le ven. 31 janv. 2020 à 14:20, Robert Metzger <rm...@apache.org> a
> écrit :
>
>> Hi,
>>
>> Flink's ProcessFunction has a close() method, which is executed on
>> shutdown of the workers. (You could also use any of the Rich* functions for
>> that purpose).
>> If you add a ProcessFunction with the same parallelism before the
>> KafkaSink, it'll be executed on the same machines as the Kafka producer.
>>
>> Afaik, the close() call should not take forever, as the system might
>> interrupt your thread if it doesn't finish closing on time (30s is the
>> default for "cluster.services.shutdown-timeout")
>>
>> Best,
>> Robert
>>
>>
>> On Tue, Jan 21, 2020 at 10:02 AM Dominique De Vito <dd...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> For a Flink batch job, some value are writing to Kafka through a
>>> Producer.
>>>
>>> I want to register a hook for closing (at the end) the Kafka producer a
>>> worker is using.... hook to be executed, of course, on worker side.
>>>
>>> Is there a way to do so ?
>>>
>>> Thanks.
>>>
>>> Regards,
>>> Dominique
>>>
>>>
>>>
>>>

Re: about registering completion function for worker shutdown

Posted by Dominique De Vito <dd...@gmail.com>.
Hi Robert,

Thanks for your hint / reply / help.

So far I have not tested your way (may be next), but tried another one:

* use mapPartitions
-- at the beginning, get a KafkaProducer
-- the KafkaProducerFactory class I use is lazy and caches the first
instances created; so, there is reuse.

* register a JVM hook for closing KafkaProducer.

So far I have met some perf issue, but I don't know yet it's due to my
pattern, or something else.

Anyway, thanks.

Regards,
Dominique


Le ven. 31 janv. 2020 à 14:20, Robert Metzger <rm...@apache.org> a
écrit :

> Hi,
>
> Flink's ProcessFunction has a close() method, which is executed on
> shutdown of the workers. (You could also use any of the Rich* functions for
> that purpose).
> If you add a ProcessFunction with the same parallelism before the
> KafkaSink, it'll be executed on the same machines as the Kafka producer.
>
> Afaik, the close() call should not take forever, as the system might
> interrupt your thread if it doesn't finish closing on time (30s is the
> default for "cluster.services.shutdown-timeout")
>
> Best,
> Robert
>
>
> On Tue, Jan 21, 2020 at 10:02 AM Dominique De Vito <dd...@gmail.com>
> wrote:
>
>> Hi,
>>
>> For a Flink batch job, some value are writing to Kafka through a Producer.
>>
>> I want to register a hook for closing (at the end) the Kafka producer a
>> worker is using.... hook to be executed, of course, on worker side.
>>
>> Is there a way to do so ?
>>
>> Thanks.
>>
>> Regards,
>> Dominique
>>
>>
>>
>>

Re: about registering completion function for worker shutdown

Posted by Robert Metzger <rm...@apache.org>.
Hi,

Flink's ProcessFunction has a close() method, which is executed on shutdown
of the workers. (You could also use any of the Rich* functions for that
purpose).
If you add a ProcessFunction with the same parallelism before the
KafkaSink, it'll be executed on the same machines as the Kafka producer.

Afaik, the close() call should not take forever, as the system might
interrupt your thread if it doesn't finish closing on time (30s is the
default for "cluster.services.shutdown-timeout")

Best,
Robert


On Tue, Jan 21, 2020 at 10:02 AM Dominique De Vito <dd...@gmail.com>
wrote:

> Hi,
>
> For a Flink batch job, some value are writing to Kafka through a Producer.
>
> I want to register a hook for closing (at the end) the Kafka producer a
> worker is using.... hook to be executed, of course, on worker side.
>
> Is there a way to do so ?
>
> Thanks.
>
> Regards,
> Dominique
>
>
>
>