You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Michał Łowicki <ml...@gmail.com> on 2021/09/03 15:57:18 UTC

How is __consumer_offsets partitioned?

Hey,

Could someone please point me to the code / doc where I can find
information what is used as key and how messages are spread across
partitions for this internal topic?

-- 
BR,
Michał Łowicki

Re: How is __consumer_offsets partitioned?

Posted by James Olsen <ja...@inaseq.com>.
If it's of any value to you, we use the following test to check that we have a well balanced set of consumer group ids.  Note that in the code, ConsumerGroups.ALL_GROUPS is simply a list of all our consumer group ids.  Spreading the offset commit load across these partitions evenly helps in levelling the load on your Brokers.  But beware of changing your group ids in an active system - you'll need to migrate carefully and prime the new offsets in the renamed group if you wish to avoid message replay (depending on your config).

import java.util.HashSet;
import java.util.Set;

import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ConsumerGroupsTest {

	private static final Logger LOG = LoggerFactory.getLogger(ConsumerGroupsTest.class);

	/* Per Kafka Docs. */
	private static final int KAFKA_OFFSET_PARTITION_COUNT = 50;

	@Test
	void distinctConsumerOffsetPartitions() {
		boolean hasDuplicates = false;
		Set<Integer> usedPartitions = new HashSet<>(KAFKA_OFFSET_PARTITION_COUNT);
		for (String group : ConsumerGroups.ALL_GROUPS) {
			int partition = getConsumerOffsetPartition(group);
			hasDuplicates = !usedPartitions.add(partition) || hasDuplicates;
		}
		/*
		 * NB: It is not an absolute requirement to have no clashes. It's simply desirable.
		 */
		Assertions.assertFalse(hasDuplicates,
				"Multiple Consumer Groups map to same offsets partition.  See prior log output.");
	}

	private int getConsumerOffsetPartition(String group) {
		final int partition = Utils.abs(group.hashCode()) % KAFKA_OFFSET_PARTITION_COUNT;
		LOG.info("{} --> {}", group, partition);
		return partition;
	}

}


> On 4/09/2021, at 15:30, Luke Chen <sh...@gmail.com> wrote:
> 
> Hi Michał,
> Internally, Kafka uses "*consumer group ID*" as the key to decide which
> __consumer_offsets partition to be used.
> The code is like this:
> 
> `Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount`
> 
> You can check the `partitionFor` method in GroupMetadataManager class.
> 
> Hope that helps.
> 
> Thank you.
> Luke
> 
> On Fri, Sep 3, 2021 at 11:57 PM Michał Łowicki <ml...@gmail.com> wrote:
> 
>> Hey,
>> 
>> Could someone please point me to the code / doc where I can find
>> information what is used as key and how messages are spread across
>> partitions for this internal topic?
>> 
>> --
>> BR,
>> Michał Łowicki
>> 


Re: How is __consumer_offsets partitioned?

Posted by Luke Chen <sh...@gmail.com>.
Hi Michał,
Internally, Kafka uses "*consumer group ID*" as the key to decide which
__consumer_offsets partition to be used.
The code is like this:

`Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount`

You can check the `partitionFor` method in GroupMetadataManager class.

Hope that helps.

Thank you.
Luke

On Fri, Sep 3, 2021 at 11:57 PM Michał Łowicki <ml...@gmail.com> wrote:

> Hey,
>
> Could someone please point me to the code / doc where I can find
> information what is used as key and how messages are spread across
> partitions for this internal topic?
>
> --
> BR,
> Michał Łowicki
>