You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Maximilian Michels <mx...@apache.org> on 2016/08/12 10:06:46 UTC

Re: Using CustomPartitionerWrapper with KeyedStream

Hi Philippe,

There is no particular reason other than hash partitioning is a
sensible default for most users. It seems like this is rarely an
issue. When the number of keys is close to the parallelism, having
idle partitions is usually not a problem due to low data volume. I see
that it could be a problem if you had multiple "hotspot" keys but then
you will have a hard time to parallelize work load anyways.

Does this limitation really impact performance for you or is this
question of theoretical nature? :) In any case, we could file an issue
and allow other partitioners for keyed streams.

Best,
Max


On Thu, Aug 11, 2016 at 10:53 PM, Philippe Caparroy
<ph...@orange.fr> wrote:
> Hi there,
>
> It seems not possible to use some custom partitioner in the context of the
> KeyedStream, without modifying the KeyedStream.
>
>
> protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner)
> {
> throw new UnsupportedOperationException("Cannot override partitioning for
> KeyedStream.");
> }
>
> In some particular situations, such as when the keys number is close to the
> partitions number, and small, using the
> keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation>
>
> might results in collisions in the partition indexes (and hence empty
> partitions) assigned by the HashPartitioner that is imposed to the
> KeyedStream :
>
> public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY>
> keySelector, TypeInformation<KEY> keyType) {
> super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
> dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
> this.keySelector = keySelector;
> this.keyType = keyType;
> }
>
> due to the characteristics of the underlying (any) hash function :
>
> returnArray[0] = MathUtils.murmurHash(key.hashCode()) %
> numberOfOutputChannels;
>
> Is there a particular reason to force the KeyedStream to use a
> HashPartitioner?
>
> Thanks in advance and best regards.
>
>
>
>
>
>
>
>

Re: Using CustomPartitionerWrapper with KeyedStream

Posted by Philippe Caparroy <ph...@orange.fr>.
Hi Max,

Thanks for the answer.
I needed to ensure that in a parallel window operation (which relies on a KeyedStream) each partition contains a single key, in the output stream of the window.
I can obtain this using a customPartitioner just after the window, but relying on the partitioner of the keyedStream could avoid the later transformation.
I was just wondering if there was a particular reason to limit the partitioner of the KeyedStream to a HashPartitioner.
I have no problems of bottleneck or performances anyway.

Best regards.
> Le 12 août 2016 à 12:06, Maximilian Michels <mx...@apache.org> a écrit :
> 
> Hi Philippe,
> 
> There is no particular reason other than hash partitioning is a
> sensible default for most users. It seems like this is rarely an
> issue. When the number of keys is close to the parallelism, having
> idle partitions is usually not a problem due to low data volume. I see
> that it could be a problem if you had multiple "hotspot" keys but then
> you will have a hard time to parallelize work load anyways.
> 
> Does this limitation really impact performance for you or is this
> question of theoretical nature? :) In any case, we could file an issue
> and allow other partitioners for keyed streams.
> 
> Best,
> Max
> 
> 
> On Thu, Aug 11, 2016 at 10:53 PM, Philippe Caparroy
> <ph...@orange.fr> wrote:
>> Hi there,
>> 
>> It seems not possible to use some custom partitioner in the context of the
>> KeyedStream, without modifying the KeyedStream.
>> 
>> 
>> protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner)
>> {
>> throw new UnsupportedOperationException("Cannot override partitioning for
>> KeyedStream.");
>> }
>> 
>> In some particular situations, such as when the keys number is close to the
>> partitions number, and small, using the
>> keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation>
>> 
>> might results in collisions in the partition indexes (and hence empty
>> partitions) assigned by the HashPartitioner that is imposed to the
>> KeyedStream :
>> 
>> public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY>
>> keySelector, TypeInformation<KEY> keyType) {
>> super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
>> dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
>> this.keySelector = keySelector;
>> this.keyType = keyType;
>> }
>> 
>> due to the characteristics of the underlying (any) hash function :
>> 
>> returnArray[0] = MathUtils.murmurHash(key.hashCode()) %
>> numberOfOutputChannels;
>> 
>> Is there a particular reason to force the KeyedStream to use a
>> HashPartitioner?
>> 
>> Thanks in advance and best regards.
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>