You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Michal Hariš <mi...@gmail.com> on 2016/06/16 11:49:40 UTC

kafka partition assignment

Hi, I was recently looking into a kafka connector issue (FLINK-4023 /
FLINK-4069), when it was pointed out that partition assignment will not be
deterministic if the partition discovery is imply moved to the open()
method. In the assignPartitions of FlinkKafkaConsumerBase a modulo on the
__index__ of the total list of topic-partitions subscribed to is used. It
is clear that calling it from the open() method in each task can produce
different lists and so some partitions can be consumed multiple times while
others not consumed at all. In a related discussion it was suggested to
build this list in a deterministic way so that each partitions sees the
same index for the same topic-partition. This would work for the issues
above, but it highlighted for me another issue which relates to partition
assignment itself - hence starting a different thread.

It don't understand at this point the way how Flink does co-group on
multiple topics but having worked in the Kafka zone for a number of years,
ignoring the physical partition id which is deterministic at the Kafka
cluster level, and using a transient list (even if it is constructed
deterministically) means that co-partitioning cannot be exploited for a
straight co-group and Flink has to always do its own shuffle. I think using
getPartition() on each topic-partition instead of list index in the
assignPartition is necessary, even if it may result in an unbalanced work
distribution among Flink consumer instances. But it seems to me that in
Flink, the partitioning schemes that exist outside its runtime are ignored.
Is it because any source outside Flink's realm is treated as to be imported
and no partitioning is assumed for simplicity/control or is it because this
is expected to produce even load-balancing of work? What else am I missing?

Michal

Re: kafka partition assignment

Posted by Michal Hariš <mi...@gmail.com>.
Hi Tai, OK, thanks for confirming. I understand that streaming shuffle
is cheaper than batch-spill shuffle, but nevertheless may be unacceptable in
large volume applications - it is still a network shuffle and that's
the biggest part of the cost.

Now on the point of the trade off between load-balancing the initial
work in Flink vs. one-less-shuffle. The problem with it is that this
trade-off doesn't exist. Using physical partition ID will not only
preserve existing co-partitioning but will also produce more balanced
initial work for Flink. This is because partitioning at the transport
layer (i.e. Kafka or other) is already balanced with respect to the
actual volumes within each topic. Putting all topic-partitions in a
flat list and taking index for assignment, you have a good chance that
all partitions from say high-volume topic will end up in the same
flink task and a set of very sparse topics, which may have the same
number of partitions for whatever reasons will go to the remainder of
flink tasks leading to a massive skew in balancing of the work.

On the point of design philosophy of partitioning I can only give you
my opinion because Kafka, and probably any distributed data
infrastructure will impose partitioning but leave the actual
partitioning scheme to the application. The particular partitioning
scheme is indeed a concern of stream-processing layer, however, it is
most efficient if it is provided by the transport layer. From the
experience of large landscapes of data sources and stream-processing
applications where a given partitioning scheme may be useful to more
than just a single workflow, it is quite essential for the
stream-processors to have an understanding which partitioning schemes
exist and can be leveraged for their computation. That is how for
example Samza co-exists with Kafka for creating real-time dynamic
queries. Intermediate shuffles can be exposed via Kafka and several
queries as well as any stream-processing framework can re-use this
shuffle for their own computation. I guess, Flink is trying
encapsulate this whole process within its own optimizer entirely,
which is not a bad idea and can lead to simplification in how such a
system would be architected, but it should still accept that
partitioning schemes that exist at the transport layer are more
efficient and better balanced to start with.


On 6/21/16, Tai Gordon <tz...@gmail.com> wrote:
> Hi Michal,
>
> I see, thanks for the description. I think you’ve definitely raised an
> interesting point.
> Yes, there may be unnecessary shuffle in this case (the partitionCustom on
> consumed DataStream doesn’t override the assignPartitions in the Kafka
> connector; custom partitioners are applied after the UDF transformation).
> It’s noteworthy that this data exchange between tasks will be a “streaming
> shuffle”, so I think the cost won’t be as high compared to other streaming
> systems that require shuffle spills. I’m not familiar with how streaming
> data shuffle in Flink works exactly though, I’m simply referring to this
> doc [1], so anyone who’s knowledgable of this part please correct me if I’m
> wrong.
>
> On the other hand, I’m curious about how the design philosophy of
> partitioning for systems like Kafka (and AWS Kinesis like-wise), and how it
> should work with Flink.
> As far as I know, Kafka’s partitioning is meant for distributing messages
> for scale and load-balancing, as well as for retaining even-order for
> messages with the same partition key. It doesn’t provide information about
> whether 2 topics are actually co-partitioned. If consuming streaming
> frameworks like Flink is to take existing co-partitioning of messages in
> Kafka into account, it’d have to assume this every time, and, like you
> mentioned, use the partition id for assigning and sacrifice work balance
> between subtasks. Perhaps we could make it an option in the connector, but
> I’m in doubt if it is reasonable for users to sacrifice work balance and
> initial throughput for the excessive streaming shuffle. Flink provides a
> lot of ways for key extraction on data streams, so for complex topologies
> there will most likely be shuffle downstream anyways.
>
> Regards,
> Gordon
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks
>
>
> On June 21, 2016 at 4:02:01 AM, Michal Hariš (michal.harish@gmail.com)
> wrote:
>
> Hi Tai, I was referring to co-partitioning, not co-location of leaders,
> i.e. multiple topics that share the same partitioning scheme. By example,
> say I have 2 topics which share the same keyspace and which are produced by
> something other than Flink using identical partitioner. The data in these 2
> topics is already co-partitioned and shouldn't require any shuffle for
> aggregations by key. From the partition assignment code and your
> explanation (and the docs) I understand that the kafka connector will
> assign the partitions to operator subtasks by iterating over the list,
> thereby most likely breaking the existing co-partitioning, and so even if I
> provide partitionCustom with the exact same code that was used to partition
> the data in the external producer, any aggregation by the existing message
> key will have to still incur the unnecessary shuffle. Correct ? Or does
> partitionCustom on the data stream somehow override the behaviour of
> assignPartitions on the source ?
>
>
> On Mon, Jun 20, 2016 at 11:01 AM, Tai Gordon <tz...@gmail.com> wrote:
>
>> Hi Michal,
>>
>> Whether or not the external system's partitioning scheme is referenced
> when
>> assigning
>> partitions to the consumer parallel subtasks depends on the
> implementation
>> of each connector / source.
>>
>> First, clarification on “co-partitioning": from your context I’m assuming
>> you’re referring to co-location of Kafka partition leaders? If so, as you
>> correctly identified, the current Kafka consumer connecter does not take
>> the co-location of Kafka partitions into account when assigning
> partitions.
>> To exploit this, though, even if the deterministic assignment takes
> leader
>> location into account, subtasks will also need to be co-located with the
>> leaders. As far as I know, there’s currently no way for subtask’s to
> access
>> their location info at runtime.
>> On the other hand, if you’re referring to “co-partitioning the data"
>> consumed from Kafka, you can use a custom partitioner on the consumed
> data
>> stream:
>>
>>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/#physical-partitioning
>> .
>>
>> Hope this helps!
>>
>> Regards,
>> Gordon
>>
>> On June 16, 2016 at 7:50:05 PM, Michal Hariš (michal.harish@gmail.com)
>> wrote:
>>
>> Hi, I was recently looking into a kafka connector issue (FLINK-4023 /
>> FLINK-4069), when it was pointed out that partition assignment will not
> be
>> deterministic if the partition discovery is imply moved to the open()
>> method. In the assignPartitions of FlinkKafkaConsumerBase a modulo on the
>> __index__ of the total list of topic-partitions subscribed to is used. It
>> is clear that calling it from the open() method in each task can produce
>> different lists and so some partitions can be consumed multiple times
> while
>> others not consumed at all. In a related discussion it was suggested to
>> build this list in a deterministic way so that each partitions sees the
>> same index for the same topic-partition. This would work for the issues
>> above, but it highlighted for me another issue which relates to partition
>> assignment itself - hence starting a different thread.
>>
>> It don't understand at this point the way how Flink does co-group on
>> multiple topics but having worked in the Kafka zone for a number of
> years,
>> ignoring the physical partition id which is deterministic at the Kafka
>> cluster level, and using a transient list (even if it is constructed
>> deterministically) means that co-partitioning cannot be exploited for a
>> straight co-group and Flink has to always do its own shuffle. I think
> using
>> getPartition() on each topic-partition instead of list index in the
>> assignPartition is necessary, even if it may result in an unbalanced work
>> distribution among Flink consumer instances. But it seems to me that in
>> Flink, the partitioning schemes that exist outside its runtime are
> ignored.
>> Is it because any source outside Flink's realm is treated as to be
> imported
>> and no partitioning is assumed for simplicity/control or is it because
> this
>> is expected to produce even load-balancing of work? What else am I
> missing?
>>
>> Michal
>>
>

Re: kafka partition assignment

Posted by Tai Gordon <tz...@gmail.com>.
Hi Michal,

I see, thanks for the description. I think you’ve definitely raised an
interesting point.
Yes, there may be unnecessary shuffle in this case (the partitionCustom on
consumed DataStream doesn’t override the assignPartitions in the Kafka
connector; custom partitioners are applied after the UDF transformation).
It’s noteworthy that this data exchange between tasks will be a “streaming
shuffle”, so I think the cost won’t be as high compared to other streaming
systems that require shuffle spills. I’m not familiar with how streaming
data shuffle in Flink works exactly though, I’m simply referring to this
doc [1], so anyone who’s knowledgable of this part please correct me if I’m
wrong.

On the other hand, I’m curious about how the design philosophy of
partitioning for systems like Kafka (and AWS Kinesis like-wise), and how it
should work with Flink.
As far as I know, Kafka’s partitioning is meant for distributing messages
for scale and load-balancing, as well as for retaining even-order for
messages with the same partition key. It doesn’t provide information about
whether 2 topics are actually co-partitioned. If consuming streaming
frameworks like Flink is to take existing co-partitioning of messages in
Kafka into account, it’d have to assume this every time, and, like you
mentioned, use the partition id for assigning and sacrifice work balance
between subtasks. Perhaps we could make it an option in the connector, but
I’m in doubt if it is reasonable for users to sacrifice work balance and
initial throughput for the excessive streaming shuffle. Flink provides a
lot of ways for key extraction on data streams, so for complex topologies
there will most likely be shuffle downstream anyways.

Regards,
Gordon


[1]
https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks


On June 21, 2016 at 4:02:01 AM, Michal Hariš (michal.harish@gmail.com)
wrote:

Hi Tai, I was referring to co-partitioning, not co-location of leaders,
i.e. multiple topics that share the same partitioning scheme. By example,
say I have 2 topics which share the same keyspace and which are produced by
something other than Flink using identical partitioner. The data in these 2
topics is already co-partitioned and shouldn't require any shuffle for
aggregations by key. From the partition assignment code and your
explanation (and the docs) I understand that the kafka connector will
assign the partitions to operator subtasks by iterating over the list,
thereby most likely breaking the existing co-partitioning, and so even if I
provide partitionCustom with the exact same code that was used to partition
the data in the external producer, any aggregation by the existing message
key will have to still incur the unnecessary shuffle. Correct ? Or does
partitionCustom on the data stream somehow override the behaviour of
assignPartitions on the source ?


On Mon, Jun 20, 2016 at 11:01 AM, Tai Gordon <tz...@gmail.com> wrote:

> Hi Michal,
>
> Whether or not the external system's partitioning scheme is referenced
when
> assigning
> partitions to the consumer parallel subtasks depends on the
implementation
> of each connector / source.
>
> First, clarification on “co-partitioning": from your context I’m assuming
> you’re referring to co-location of Kafka partition leaders? If so, as you
> correctly identified, the current Kafka consumer connecter does not take
> the co-location of Kafka partitions into account when assigning
partitions.
> To exploit this, though, even if the deterministic assignment takes
leader
> location into account, subtasks will also need to be co-located with the
> leaders. As far as I know, there’s currently no way for subtask’s to
access
> their location info at runtime.
> On the other hand, if you’re referring to “co-partitioning the data"
> consumed from Kafka, you can use a custom partitioner on the consumed
data
> stream:
>
>
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/#physical-partitioning
> .
>
> Hope this helps!
>
> Regards,
> Gordon
>
> On June 16, 2016 at 7:50:05 PM, Michal Hariš (michal.harish@gmail.com)
> wrote:
>
> Hi, I was recently looking into a kafka connector issue (FLINK-4023 /
> FLINK-4069), when it was pointed out that partition assignment will not
be
> deterministic if the partition discovery is imply moved to the open()
> method. In the assignPartitions of FlinkKafkaConsumerBase a modulo on the
> __index__ of the total list of topic-partitions subscribed to is used. It
> is clear that calling it from the open() method in each task can produce
> different lists and so some partitions can be consumed multiple times
while
> others not consumed at all. In a related discussion it was suggested to
> build this list in a deterministic way so that each partitions sees the
> same index for the same topic-partition. This would work for the issues
> above, but it highlighted for me another issue which relates to partition
> assignment itself - hence starting a different thread.
>
> It don't understand at this point the way how Flink does co-group on
> multiple topics but having worked in the Kafka zone for a number of
years,
> ignoring the physical partition id which is deterministic at the Kafka
> cluster level, and using a transient list (even if it is constructed
> deterministically) means that co-partitioning cannot be exploited for a
> straight co-group and Flink has to always do its own shuffle. I think
using
> getPartition() on each topic-partition instead of list index in the
> assignPartition is necessary, even if it may result in an unbalanced work
> distribution among Flink consumer instances. But it seems to me that in
> Flink, the partitioning schemes that exist outside its runtime are
ignored.
> Is it because any source outside Flink's realm is treated as to be
imported
> and no partitioning is assumed for simplicity/control or is it because
this
> is expected to produce even load-balancing of work? What else am I
missing?
>
> Michal
>

Re: kafka partition assignment

Posted by Michal Hariš <mi...@gmail.com>.
Hi Tai, I was referring to co-partitioning, not co-location of leaders,
i.e. multiple topics that share the same partitioning scheme. By example,
say I have 2 topics which share the same keyspace and which are produced by
something other than Flink using identical partitioner. The data in these 2
topics is already co-partitioned and shouldn't require any shuffle for
aggregations by key. From the partition assignment code and your
explanation (and the docs) I understand that the kafka connector will
assign the partitions to operator subtasks by iterating over the list,
thereby most likely breaking the existing co-partitioning, and so even if I
provide partitionCustom with the exact same code that was used to partition
the data in the external producer, any aggregation by the existing message
key will have to still incur the unnecessary shuffle. Correct ? Or does
partitionCustom on the data stream somehow override the behaviour of
assignPartitions on the source ?


On Mon, Jun 20, 2016 at 11:01 AM, Tai Gordon <tz...@gmail.com> wrote:

> Hi Michal,
>
> Whether or not the external system's partitioning scheme is referenced when
> assigning
> partitions to the consumer parallel subtasks depends on the implementation
> of each connector / source.
>
> First, clarification on “co-partitioning": from your context I’m assuming
> you’re referring to co-location of Kafka partition leaders? If so, as you
> correctly identified, the current Kafka consumer connecter does not take
> the co-location of Kafka partitions into account when assigning partitions.
> To exploit this, though, even if the deterministic assignment takes leader
> location into account, subtasks will also need to be co-located with the
> leaders. As far as I know, there’s currently no way for subtask’s to access
> their location info at runtime.
> On the other hand, if you’re referring to “co-partitioning the data"
> consumed from Kafka, you can use a custom partitioner on the consumed data
> stream:
>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/#physical-partitioning
> .
>
> Hope this helps!
>
> Regards,
> Gordon
>
> On June 16, 2016 at 7:50:05 PM, Michal Hariš (michal.harish@gmail.com)
> wrote:
>
> Hi, I was recently looking into a kafka connector issue (FLINK-4023 /
> FLINK-4069), when it was pointed out that partition assignment will not be
> deterministic if the partition discovery is imply moved to the open()
> method. In the assignPartitions of FlinkKafkaConsumerBase a modulo on the
> __index__ of the total list of topic-partitions subscribed to is used. It
> is clear that calling it from the open() method in each task can produce
> different lists and so some partitions can be consumed multiple times while
> others not consumed at all. In a related discussion it was suggested to
> build this list in a deterministic way so that each partitions sees the
> same index for the same topic-partition. This would work for the issues
> above, but it highlighted for me another issue which relates to partition
> assignment itself - hence starting a different thread.
>
> It don't understand at this point the way how Flink does co-group on
> multiple topics but having worked in the Kafka zone for a number of years,
> ignoring the physical partition id which is deterministic at the Kafka
> cluster level, and using a transient list (even if it is constructed
> deterministically) means that co-partitioning cannot be exploited for a
> straight co-group and Flink has to always do its own shuffle. I think using
> getPartition() on each topic-partition instead of list index in the
> assignPartition is necessary, even if it may result in an unbalanced work
> distribution among Flink consumer instances. But it seems to me that in
> Flink, the partitioning schemes that exist outside its runtime are ignored.
> Is it because any source outside Flink's realm is treated as to be imported
> and no partitioning is assumed for simplicity/control or is it because this
> is expected to produce even load-balancing of work? What else am I missing?
>
> Michal
>

Re: kafka partition assignment

Posted by Tai Gordon <tz...@gmail.com>.
Hi Michal,

Whether or not the external system's partitioning scheme is referenced when
assigning
partitions to the consumer parallel subtasks depends on the implementation
of each connector / source.

First, clarification on “co-partitioning": from your context I’m assuming
you’re referring to co-location of Kafka partition leaders? If so, as you
correctly identified, the current Kafka consumer connecter does not take
the co-location of Kafka partitions into account when assigning partitions.
To exploit this, though, even if the deterministic assignment takes leader
location into account, subtasks will also need to be co-located with the
leaders. As far as I know, there’s currently no way for subtask’s to access
their location info at runtime.
On the other hand, if you’re referring to “co-partitioning the data"
consumed from Kafka, you can use a custom partitioner on the consumed data
stream:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/#physical-partitioning
.

Hope this helps!

Regards,
Gordon

On June 16, 2016 at 7:50:05 PM, Michal Hariš (michal.harish@gmail.com)
wrote:

Hi, I was recently looking into a kafka connector issue (FLINK-4023 /
FLINK-4069), when it was pointed out that partition assignment will not be
deterministic if the partition discovery is imply moved to the open()
method. In the assignPartitions of FlinkKafkaConsumerBase a modulo on the
__index__ of the total list of topic-partitions subscribed to is used. It
is clear that calling it from the open() method in each task can produce
different lists and so some partitions can be consumed multiple times while
others not consumed at all. In a related discussion it was suggested to
build this list in a deterministic way so that each partitions sees the
same index for the same topic-partition. This would work for the issues
above, but it highlighted for me another issue which relates to partition
assignment itself - hence starting a different thread.

It don't understand at this point the way how Flink does co-group on
multiple topics but having worked in the Kafka zone for a number of years,
ignoring the physical partition id which is deterministic at the Kafka
cluster level, and using a transient list (even if it is constructed
deterministically) means that co-partitioning cannot be exploited for a
straight co-group and Flink has to always do its own shuffle. I think using
getPartition() on each topic-partition instead of list index in the
assignPartition is necessary, even if it may result in an unbalanced work
distribution among Flink consumer instances. But it seems to me that in
Flink, the partitioning schemes that exist outside its runtime are ignored.
Is it because any source outside Flink's realm is treated as to be imported
and no partitioning is assumed for simplicity/control or is it because this
is expected to produce even load-balancing of work? What else am I missing?

Michal