You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Robert Metzger <rm...@apache.org> on 2020/05/29 11:09:56 UTC

Re: Best way to "emulate" a rich Partitioner with open() and close() methods ?

Hi Arnaud,

Maybe I don't fully understand the constraints, but what about
stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink());

The map(new GetKuduPartitionMapper) will be a regular RichMapFunction with
open() and close() where you can handle the connection with Kudu's
partitioning service.
The map will output a Tuple2<PartitionId, Data> (or something nicer :) ),
then Flink shuffles your data correctly, and the sinks will process the
data correctly partitioned.

I hope that this is what you were looking for!

Best,
Robert

On Thu, May 28, 2020 at 6:21 PM LINZ, Arnaud <AL...@bouyguestelecom.fr>
wrote:

> Hello,
>
>
>
> I would like to upgrade the performance of my Apache Kudu Sink by using
> the new “KuduPartitioner” of Kudu API to match Flink stream partitions with
> Kudu partitions to lower the network shuffling.
>
> For that, I would like to implement something like
>
> stream.partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new
> KuduSink(…)));
>
> With KuduFLinkPartitioner a implementation of org.apache.flink.api.common.functions.Partitioner
> that internally make use of the KuduPartitioner client tool of Kudu’s API.
>
>
>
> However for that KuduPartioner to work, it needs to open – and close at
> the end – a connection to the Kudu table – obviously something that can’t
> be done for each line. But there is no “AbstractRichPartitioner” with
> open() and close() method that I can use for that (the way I use it in the
> sink for instance).
>
>
>
> What is the best way to implement this ?
>
> I thought of ThreadLocals that would be initialized during the first call
> to *int* partition(K key, *int* numPartitions);  but I won’t be able to
> close() things nicely as I won’t be notified on job termination.
>
>
>
> I thought of putting those static ThreadLocals inside a “Identity Mapper”
> that would be called just prior the partition with something like :
>
> stream.map(richIdentiyConnectionManagerMapper).partitionCustom(new
> KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));
>
> with kudu connections initialized in the mapper open(), closed in the
> mapper close(), and used  in the partitioner partition().
>
> However It looks like an ugly hack breaking every coding principle, but as
> long as the threads are reused between the mapper and the partitioner I
> think that it should work.
>
>
>
> Is there a better way to do this ?
>
>
>
> Best regards,
>
> Arnaud
>
>
>
>
>
> ------------------------------
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>

Re: Best way to "emulate" a rich Partitioner with open() and close() methods ?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

I agree with Robert that adding open/close support for partitioners 
would mean additional complexity in the code base. We're currently not 
thinking of supporting that.

Best,
Aljoscha

On 05.06.20 20:19, Arvid Heise wrote:
> Hi Arnaud,
> 
> just to add up. The overhead of this additional map is negligible if you
> enable object reuse [1].
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html
> 
> On Tue, Jun 2, 2020 at 10:34 AM Robert Metzger <rm...@apache.org> wrote:
> 
>> I'm not 100% sure about this answer, that's why I'm CCing Aljoscha to
>> correct me if needed:
>>
>> Partitioners are not regular operators (like a map or window), thus they
>> are not included in the regular Task lifecycle methods (of open() / map()
>> etc. / close(), with the proper error handling, task cancellation
>> mechanisms etc.). The custom partition function is called somewhere close
>> to the network stack.
>> It would be quite a lot of effort (and added complexity to the codebase)
>> to allow for rich partitioners. Given that custom partitioners are a rarely
>> used feature, it would not be justified to spend a lot of time for this
>> (there's also a good workaround available)
>>
>>
>> On Fri, May 29, 2020 at 2:46 PM LINZ, Arnaud <AL...@bouyguestelecom.fr>
>> wrote:
>>
>>> Hello,
>>>
>>>
>>>
>>> Yes, that would definitely do the trick, with an extra mapper after keyBy
>>> to remove the tuple so that it stays seamless. It’s less hacky that what I
>>> was thinking of, thanks!
>>>
>>> However, is there any plan in a future release to have rich partitioners
>>> ? That would avoid adding  overhead and “intermediate” technical info in
>>> the stream payload.
>>>
>>> Best,
>>>
>>> Arnaud
>>>
>>>
>>>
>>> *De :* Robert Metzger <rm...@apache.org>
>>> *Envoyé :* vendredi 29 mai 2020 13:10
>>> *À :* LINZ, Arnaud <AL...@bouyguestelecom.fr>
>>> *Cc :* user <us...@flink.apache.org>
>>> *Objet :* Re: Best way to "emulate" a rich Partitioner with open() and
>>> close() methods ?
>>>
>>>
>>>
>>> Hi Arnaud,
>>>
>>>
>>>
>>> Maybe I don't fully understand the constraints, but what about
>>>
>>> stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink());
>>>
>>>
>>> The map(new GetKuduPartitionMapper) will be a regular RichMapFunction
>>> with open() and close() where you can handle the connection with Kudu's
>>> partitioning service.
>>>
>>> The map will output a Tuple2<PartitionId, Data> (or something nicer :) ),
>>> then Flink shuffles your data correctly, and the sinks will process the
>>> data correctly partitioned.
>>>
>>>
>>>
>>> I hope that this is what you were looking for!
>>>
>>>
>>>
>>> Best,
>>>
>>> Robert
>>>
>>>
>>>
>>> On Thu, May 28, 2020 at 6:21 PM LINZ, Arnaud <AL...@bouyguestelecom.fr>
>>> wrote:
>>>
>>> Hello,
>>>
>>>
>>>
>>> I would like to upgrade the performance of my Apache Kudu Sink by using
>>> the new “KuduPartitioner” of Kudu API to match Flink stream partitions with
>>> Kudu partitions to lower the network shuffling.
>>>
>>> For that, I would like to implement something like
>>>
>>> stream.partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new
>>> KuduSink(…)));
>>>
>>> With KuduFLinkPartitioner a implementation of org.apache.flink.api.common.functions.Partitioner
>>> that internally make use of the KuduPartitioner client tool of Kudu’s API.
>>>
>>>
>>>
>>> However for that KuduPartioner to work, it needs to open – and close at
>>> the end – a connection to the Kudu table – obviously something that can’t
>>> be done for each line. But there is no “AbstractRichPartitioner” with
>>> open() and close() method that I can use for that (the way I use it in the
>>> sink for instance).
>>>
>>>
>>>
>>> What is the best way to implement this ?
>>>
>>> I thought of ThreadLocals that would be initialized during the first call
>>> to *int* partition(K key, *int* numPartitions);  but I won’t be able to
>>> close() things nicely as I won’t be notified on job termination.
>>>
>>>
>>>
>>> I thought of putting those static ThreadLocals inside a “Identity Mapper”
>>> that would be called just prior the partition with something like :
>>>
>>> stream.map(richIdentiyConnectionManagerMapper).partitionCustom(new
>>> KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));
>>>
>>> with kudu connections initialized in the mapper open(), closed in the
>>> mapper close(), and used  in the partitioner partition().
>>>
>>> However It looks like an ugly hack breaking every coding principle, but
>>> as long as the threads are reused between the mapper and the partitioner I
>>> think that it should work.
>>>
>>>
>>>
>>> Is there a better way to do this ?
>>>
>>>
>>>
>>> Best regards,
>>>
>>> Arnaud
>>>
>>>
>>>
>>>
>>>
>>>
>>> ------------------------------
>>>
>>>
>>> L'intégrité de ce message n'étant pas assurée sur internet, la société
>>> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
>>> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
>>> vous n'êtes pas destinataire de ce message, merci de le détruire et
>>> d'avertir l'expéditeur.
>>>
>>> The integrity of this message cannot be guaranteed on the Internet. The
>>> company that sent this message cannot therefore be held liable for its
>>> content nor attachments. Any unauthorized use or dissemination is
>>> prohibited. If you are not the intended recipient of this message, then
>>> please delete it and notify the sender.
>>>
>>>
> 


Re: Best way to "emulate" a rich Partitioner with open() and close() methods ?

Posted by Arvid Heise <ar...@ververica.com>.
Hi Arnaud,

just to add up. The overhead of this additional map is negligible if you
enable object reuse [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html

On Tue, Jun 2, 2020 at 10:34 AM Robert Metzger <rm...@apache.org> wrote:

> I'm not 100% sure about this answer, that's why I'm CCing Aljoscha to
> correct me if needed:
>
> Partitioners are not regular operators (like a map or window), thus they
> are not included in the regular Task lifecycle methods (of open() / map()
> etc. / close(), with the proper error handling, task cancellation
> mechanisms etc.). The custom partition function is called somewhere close
> to the network stack.
> It would be quite a lot of effort (and added complexity to the codebase)
> to allow for rich partitioners. Given that custom partitioners are a rarely
> used feature, it would not be justified to spend a lot of time for this
> (there's also a good workaround available)
>
>
> On Fri, May 29, 2020 at 2:46 PM LINZ, Arnaud <AL...@bouyguestelecom.fr>
> wrote:
>
>> Hello,
>>
>>
>>
>> Yes, that would definitely do the trick, with an extra mapper after keyBy
>> to remove the tuple so that it stays seamless. It’s less hacky that what I
>> was thinking of, thanks!
>>
>> However, is there any plan in a future release to have rich partitioners
>> ? That would avoid adding  overhead and “intermediate” technical info in
>> the stream payload.
>>
>> Best,
>>
>> Arnaud
>>
>>
>>
>> *De :* Robert Metzger <rm...@apache.org>
>> *Envoyé :* vendredi 29 mai 2020 13:10
>> *À :* LINZ, Arnaud <AL...@bouyguestelecom.fr>
>> *Cc :* user <us...@flink.apache.org>
>> *Objet :* Re: Best way to "emulate" a rich Partitioner with open() and
>> close() methods ?
>>
>>
>>
>> Hi Arnaud,
>>
>>
>>
>> Maybe I don't fully understand the constraints, but what about
>>
>> stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink());
>>
>>
>> The map(new GetKuduPartitionMapper) will be a regular RichMapFunction
>> with open() and close() where you can handle the connection with Kudu's
>> partitioning service.
>>
>> The map will output a Tuple2<PartitionId, Data> (or something nicer :) ),
>> then Flink shuffles your data correctly, and the sinks will process the
>> data correctly partitioned.
>>
>>
>>
>> I hope that this is what you were looking for!
>>
>>
>>
>> Best,
>>
>> Robert
>>
>>
>>
>> On Thu, May 28, 2020 at 6:21 PM LINZ, Arnaud <AL...@bouyguestelecom.fr>
>> wrote:
>>
>> Hello,
>>
>>
>>
>> I would like to upgrade the performance of my Apache Kudu Sink by using
>> the new “KuduPartitioner” of Kudu API to match Flink stream partitions with
>> Kudu partitions to lower the network shuffling.
>>
>> For that, I would like to implement something like
>>
>> stream.partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new
>> KuduSink(…)));
>>
>> With KuduFLinkPartitioner a implementation of org.apache.flink.api.common.functions.Partitioner
>> that internally make use of the KuduPartitioner client tool of Kudu’s API.
>>
>>
>>
>> However for that KuduPartioner to work, it needs to open – and close at
>> the end – a connection to the Kudu table – obviously something that can’t
>> be done for each line. But there is no “AbstractRichPartitioner” with
>> open() and close() method that I can use for that (the way I use it in the
>> sink for instance).
>>
>>
>>
>> What is the best way to implement this ?
>>
>> I thought of ThreadLocals that would be initialized during the first call
>> to *int* partition(K key, *int* numPartitions);  but I won’t be able to
>> close() things nicely as I won’t be notified on job termination.
>>
>>
>>
>> I thought of putting those static ThreadLocals inside a “Identity Mapper”
>> that would be called just prior the partition with something like :
>>
>> stream.map(richIdentiyConnectionManagerMapper).partitionCustom(new
>> KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));
>>
>> with kudu connections initialized in the mapper open(), closed in the
>> mapper close(), and used  in the partitioner partition().
>>
>> However It looks like an ugly hack breaking every coding principle, but
>> as long as the threads are reused between the mapper and the partitioner I
>> think that it should work.
>>
>>
>>
>> Is there a better way to do this ?
>>
>>
>>
>> Best regards,
>>
>> Arnaud
>>
>>
>>
>>
>>
>>
>> ------------------------------
>>
>>
>> L'intégrité de ce message n'étant pas assurée sur internet, la société
>> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
>> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
>> vous n'êtes pas destinataire de ce message, merci de le détruire et
>> d'avertir l'expéditeur.
>>
>> The integrity of this message cannot be guaranteed on the Internet. The
>> company that sent this message cannot therefore be held liable for its
>> content nor attachments. Any unauthorized use or dissemination is
>> prohibited. If you are not the intended recipient of this message, then
>> please delete it and notify the sender.
>>
>>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Best way to "emulate" a rich Partitioner with open() and close() methods ?

Posted by Robert Metzger <rm...@apache.org>.
I'm not 100% sure about this answer, that's why I'm CCing Aljoscha to
correct me if needed:

Partitioners are not regular operators (like a map or window), thus they
are not included in the regular Task lifecycle methods (of open() / map()
etc. / close(), with the proper error handling, task cancellation
mechanisms etc.). The custom partition function is called somewhere close
to the network stack.
It would be quite a lot of effort (and added complexity to the codebase) to
allow for rich partitioners. Given that custom partitioners are a rarely
used feature, it would not be justified to spend a lot of time for this
(there's also a good workaround available)


On Fri, May 29, 2020 at 2:46 PM LINZ, Arnaud <AL...@bouyguestelecom.fr>
wrote:

> Hello,
>
>
>
> Yes, that would definitely do the trick, with an extra mapper after keyBy
> to remove the tuple so that it stays seamless. It’s less hacky that what I
> was thinking of, thanks!
>
> However, is there any plan in a future release to have rich partitioners ?
> That would avoid adding  overhead and “intermediate” technical info in the
> stream payload.
>
> Best,
>
> Arnaud
>
>
>
> *De :* Robert Metzger <rm...@apache.org>
> *Envoyé :* vendredi 29 mai 2020 13:10
> *À :* LINZ, Arnaud <AL...@bouyguestelecom.fr>
> *Cc :* user <us...@flink.apache.org>
> *Objet :* Re: Best way to "emulate" a rich Partitioner with open() and
> close() methods ?
>
>
>
> Hi Arnaud,
>
>
>
> Maybe I don't fully understand the constraints, but what about
>
> stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink());
>
>
> The map(new GetKuduPartitionMapper) will be a regular RichMapFunction with
> open() and close() where you can handle the connection with Kudu's
> partitioning service.
>
> The map will output a Tuple2<PartitionId, Data> (or something nicer :) ),
> then Flink shuffles your data correctly, and the sinks will process the
> data correctly partitioned.
>
>
>
> I hope that this is what you were looking for!
>
>
>
> Best,
>
> Robert
>
>
>
> On Thu, May 28, 2020 at 6:21 PM LINZ, Arnaud <AL...@bouyguestelecom.fr>
> wrote:
>
> Hello,
>
>
>
> I would like to upgrade the performance of my Apache Kudu Sink by using
> the new “KuduPartitioner” of Kudu API to match Flink stream partitions with
> Kudu partitions to lower the network shuffling.
>
> For that, I would like to implement something like
>
> stream.partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new
> KuduSink(…)));
>
> With KuduFLinkPartitioner a implementation of org.apache.flink.api.common.functions.Partitioner
> that internally make use of the KuduPartitioner client tool of Kudu’s API.
>
>
>
> However for that KuduPartioner to work, it needs to open – and close at
> the end – a connection to the Kudu table – obviously something that can’t
> be done for each line. But there is no “AbstractRichPartitioner” with
> open() and close() method that I can use for that (the way I use it in the
> sink for instance).
>
>
>
> What is the best way to implement this ?
>
> I thought of ThreadLocals that would be initialized during the first call
> to *int* partition(K key, *int* numPartitions);  but I won’t be able to
> close() things nicely as I won’t be notified on job termination.
>
>
>
> I thought of putting those static ThreadLocals inside a “Identity Mapper”
> that would be called just prior the partition with something like :
>
> stream.map(richIdentiyConnectionManagerMapper).partitionCustom(new
> KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));
>
> with kudu connections initialized in the mapper open(), closed in the
> mapper close(), and used  in the partitioner partition().
>
> However It looks like an ugly hack breaking every coding principle, but as
> long as the threads are reused between the mapper and the partitioner I
> think that it should work.
>
>
>
> Is there a better way to do this ?
>
>
>
> Best regards,
>
> Arnaud
>
>
>
>
>
>
> ------------------------------
>
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>
>

RE: Best way to "emulate" a rich Partitioner with open() and close() methods ?

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hello,

Yes, that would definitely do the trick, with an extra mapper after keyBy to remove the tuple so that it stays seamless. It’s less hacky that what I was thinking of, thanks!
However, is there any plan in a future release to have rich partitioners ? That would avoid adding  overhead and “intermediate” technical info in the stream payload.
Best,
Arnaud

De : Robert Metzger <rm...@apache.org>
Envoyé : vendredi 29 mai 2020 13:10
À : LINZ, Arnaud <AL...@bouyguestelecom.fr>
Cc : user <us...@flink.apache.org>
Objet : Re: Best way to "emulate" a rich Partitioner with open() and close() methods ?

Hi Arnaud,

Maybe I don't fully understand the constraints, but what about
stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink());

The map(new GetKuduPartitionMapper) will be a regular RichMapFunction with open() and close() where you can handle the connection with Kudu's partitioning service.
The map will output a Tuple2<PartitionId, Data> (or something nicer :) ), then Flink shuffles your data correctly, and the sinks will process the data correctly partitioned.

I hope that this is what you were looking for!

Best,
Robert

On Thu, May 28, 2020 at 6:21 PM LINZ, Arnaud <AL...@bouyguestelecom.fr>> wrote:

Hello,



I would like to upgrade the performance of my Apache Kudu Sink by using the new “KuduPartitioner” of Kudu API to match Flink stream partitions with Kudu partitions to lower the network shuffling.

For that, I would like to implement something like

stream.partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));

With KuduFLinkPartitioner a implementation of org.apache.flink.api.common.functions.Partitioner that internally make use of the KuduPartitioner client tool of Kudu’s API.



However for that KuduPartioner to work, it needs to open – and close at the end – a connection to the Kudu table – obviously something that can’t be done for each line. But there is no “AbstractRichPartitioner” with open() and close() method that I can use for that (the way I use it in the sink for instance).



What is the best way to implement this ?

I thought of ThreadLocals that would be initialized during the first call to int partition(K key, int numPartitions);  but I won’t be able to close() things nicely as I won’t be notified on job termination.



I thought of putting those static ThreadLocals inside a “Identity Mapper” that would be called just prior the partition with something like :

stream.map(richIdentiyConnectionManagerMapper).partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));

with kudu connections initialized in the mapper open(), closed in the mapper close(), and used  in the partitioner partition().

However It looks like an ugly hack breaking every coding principle, but as long as the threads are reused between the mapper and the partitioner I think that it should work.



Is there a better way to do this ?



Best regards,

Arnaud





________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.