You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Niels Basjes <Ni...@basjes.nl> on 2018/08/19 08:43:59 UTC

[DISCUSS] Change the Keyed partitioning behavior of the Kafka Producer API

Hi,

A while ago we had found that if you construct a Kafka Producer that it
always uses the  FlinkFixedPartitioner to spread the data across the Kafka
partitions.
Except when you give it a custom partitioner.

Because we want all our elements to be partitioned by the key of the
records we created this issue and put up a pull request with a
simple FlinkKeyHashPartitioner.

https://issues.apache.org/jira/browse/FLINK-9610

A comment by one of the reviewers (Tzu-Li Tai) was essentially: "Kafka does
this by default already, why this change?"

So I did a lot deeper digging to understand how the partitioning decisions
and data flows from the Flink API down into the Kafka producer client code.

My conclusions:
1) The Kafka producer code uses the provided partitioner, if it doesn't
have that it uses the hash of the key, if it doesn't have a key then it
does a round robin distribution.
2) The Flink Kafka producer constructors are available in the variants with
and without a partitioner. Even if you provide a valid key for each record
it will still use the  FlinkFixedPartitioner if no explicit partitioner has
been specified.

Looking at the code (I haven't tried it) you can actually get the desired
behavior without any code changes by using the constructor that requires a
partitioner and there give it a null value.
Yuck!

In my opinion providing a KeyedSerializationSchema is an implicit way of
specifying that you want to use that key to partition the data by.

So to make this a workable situation I see three ways to handle this:
1) We merge something like the partitioner I proposed.
2) We change the constructors that get a KeyedSerializationSchema to use
that key for partitioning.
3) We remove all constructors that have a KeyedSerializationSchema because
the key is never used anyway.

I think '3)' is bad, '1)' is 'Ok' and '2)' although breaking backward
compatibility is the best solution.

So to clarify the change I propose here:
We change the behavior of the all flink producer constructors that have
a KeyedSerializationSchema parameter and NO partitioner.
The proposed change is that because we HAVE a key and we do NOT have a
partitioner the partitioning is done by the partitioning code that already
exists in the underlying Kafka.

So for the rest of the constructors the behavior remains unchanged:
- With a  NON-Keyed SerializationSchema
- With a provided partitioner

What do you guys think?

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: [DISCUSS] Change the Keyed partitioning behavior of the Kafka Producer API

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Niels,

Your conclusions are accurate, and I also agree with the fact that the
combination of the KeyedSerializationSchema / providing partitioners, etc.
is all a bit awkward as of the current state.

As for the proposed solutions, I personally disagree with 1), since key
partitioning, IMO, should be the default behavior. There were actually
already discussions in making that happen once a Kafka connector remake
happens in the future.
And yes, 2) seems to be the best solution here.

To round up solution 2), we have:
- A constructor that takes a KeyedSerializationSchema, and NO partitioner.
This implicitly uses Kafka's key partitioning.
- A constructor that takes a SerializationSchema, and
Optional<FlinkKafkaPartitioner>. By default, the `FlinkFixedPartitioner` is
used. If None is provided, then we use round-robin partitioning.
Though, this would be breaking because default partitioning behaviours for
the KeyedSerializationSchema variant would change.

I would vote against introducing a breaking change now, since key
partitioning is still achievable right now (although admittedly in a very
non-friendly way).
Instead, we only incorporate these ideas when the Kafka connector remake
happens.
There has already been thoughts in doing this, triggered by many other
aspects (reworking Flink's source interface, having a common abstraction
for efficient partition discovery / idleness detection in partition-based
replayable sources, etc. )

Overall, I think that this discussion also brings up another aspect of the
`KeyedSerializationSchema` - it bundles too many concerns within a single
interface.
1. It defines the serialization.
2. It extracts the partitioning key for each record (though it may never be
used), due to custom partitioning. This might have been better off with a
separate `KafkaKeyExtractor`, for example.
3. It decides the target topic for each record, which may be more suitable
in the `FlinkKafkaPartitioner` interface.

Cheers,
Gordon

On Sun, Aug 19, 2018 at 4:44 PM Niels Basjes <Ni...@basjes.nl> wrote:

> Hi,
>
> A while ago we had found that if you construct a Kafka Producer that it
> always uses the  FlinkFixedPartitioner to spread the data across the Kafka
> partitions.
> Except when you give it a custom partitioner.
>
> Because we want all our elements to be partitioned by the key of the
> records we created this issue and put up a pull request with a
> simple FlinkKeyHashPartitioner.
>
> https://issues.apache.org/jira/browse/FLINK-9610
>
> A comment by one of the reviewers (Tzu-Li Tai) was essentially: "Kafka does
> this by default already, why this change?"
>
> So I did a lot deeper digging to understand how the partitioning decisions
> and data flows from the Flink API down into the Kafka producer client code.
>
> My conclusions:
> 1) The Kafka producer code uses the provided partitioner, if it doesn't
> have that it uses the hash of the key, if it doesn't have a key then it
> does a round robin distribution.
> 2) The Flink Kafka producer constructors are available in the variants with
> and without a partitioner. Even if you provide a valid key for each record
> it will still use the  FlinkFixedPartitioner if no explicit partitioner has
> been specified.
>
> Looking at the code (I haven't tried it) you can actually get the desired
> behavior without any code changes by using the constructor that requires a
> partitioner and there give it a null value.
> Yuck!
>
> In my opinion providing a KeyedSerializationSchema is an implicit way of
> specifying that you want to use that key to partition the data by.
>
> So to make this a workable situation I see three ways to handle this:
> 1) We merge something like the partitioner I proposed.
> 2) We change the constructors that get a KeyedSerializationSchema to use
> that key for partitioning.
> 3) We remove all constructors that have a KeyedSerializationSchema because
> the key is never used anyway.
>
> I think '3)' is bad, '1)' is 'Ok' and '2)' although breaking backward
> compatibility is the best solution.
>
> So to clarify the change I propose here:
> We change the behavior of the all flink producer constructors that have
> a KeyedSerializationSchema parameter and NO partitioner.
> The proposed change is that because we HAVE a key and we do NOT have a
> partitioner the partitioning is done by the partitioning code that already
> exists in the underlying Kafka.
>
> So for the rest of the constructors the behavior remains unchanged:
> - With a  NON-Keyed SerializationSchema
> - With a provided partitioner
>
> What do you guys think?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>