You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by yunfan123 <yu...@foxmail.com> on 2017/11/08 07:35:33 UTC

What happened if my parallelism more than kafka partitions.

It seems the same partition data will be consume multi times?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: What happened if my parallelism more than kafka partitions.

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
The `KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks)` method returns the index of the target subtask for a given Kafka partition.
The implementation in that method ensures that the same subtask index will always be returned for the same partition.

Each consumer subtask will locally invoke this assignment method for each Kafka partition.
If the returned subtask index doesn’t equal the subtask’s index, that partition will be filtered out and not be read by the subtask.

On 8 November 2017 at 6:38:54 PM, yunfan123 (yunfanfighting@foxmail.com) wrote:

The code of kafka partition assign is like follows:  

public static int assign(KafkaTopicPartition partition, int  
numParallelSubtasks) {  
int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) %  
numParallelSubtasks;  

// here, the assumption is that the id of Kafka partitions are always  
ascending  
// starting from 0, and therefore can be used directly as the offset  
clockwise from the start index  
return (startIndex + partition.getPartition()) % numParallelSubtasks;  
}  

It seems it will assign to multi sub tasks.  
I wonder how flink ensure some subtasks will simply remain idle  



--  
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/  

Re: What happened if my parallelism more than kafka partitions.

Posted by yunfan123 <yu...@foxmail.com>.
The code of kafka partition assign is like follows:

public static int assign(KafkaTopicPartition partition, int
numParallelSubtasks) {
		int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) %
numParallelSubtasks;

		// here, the assumption is that the id of Kafka partitions are always
ascending
		// starting from 0, and therefore can be used directly as the offset
clockwise from the start index
		return (startIndex + partition.getPartition()) % numParallelSubtasks;
	}

It seems it will assign to multi sub tasks.
I wonder how flink ensure some subtasks will simply remain idle



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: What happened if my parallelism more than kafka partitions.

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi!

You can set the parallelism of the Flink Kafka Consumer independent of the number of partitions.
If there are more consumer subtasks than the number of Kafka partitions to read (i.e. when the parallelism of the consumer is set higher than the number of partitions), some subtasks will simply remain idle.
Each Kafka partition is deterministically assigned to a single consumer subtask.

Cheers,
Gordon


On 8 November 2017 at 4:21:54 PM, yunfan123 (yunfanfighting@foxmail.com) wrote:

It seems the same partition data will be consume multi times? 



-- 
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/