You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Shi Quan <qu...@outlook.com> on 2019/06/13 02:32:52 UTC

答复: Re:flink kafka source在并行分布式下是怎么确定一个subtask消费哪个kafka partition的?

1.     会运行在每个subtask中;

2.     可以获取多个KTP

3.     代码请阅AbstractPartitionDiscorver中的setAndCheckDiscoveredPartition方法和KafkaTopicPartitionAssigner中的assign方法。大体来说会根据并行度(subtask总数)、subtask index、topic partiion来具体分配。



发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用



________________________________
发件人: 张的像恐龙 <ys...@163.com>
发送时间: Wednesday, June 12, 2019 5:59:05 PM
收件人: user-zh@flink.apache.org
主题: Re:flink kafka source在并行分布式下是怎么确定一个subtask消费哪个kafka partition的?

从目前的使用看,flink是一个线程对应一个topic的partition,不知道能不能一个线程可以消费多个partition 数据或多个线程消费一个 partition









在 2019-05-23 17:56:35,"junming liu" <yu...@hotmail.com> 写道:
>Hi All,
>
>我们写kafka comsumer通常都不需要去管消费哪个partition,comsumer会根据partition.assignment.strategy设置的分配策略自动协商分配每个线程消费哪个或者哪些分区
>
>但在FlinkKafkaConsumer中调用的KafkaConsumerThread这个消费线程代码中有如下代码
>
>
>try {
>   if (hasAssignedPartitions) {
>      newPartitions = unassignedPartitionsQueue.pollBatch();
>   }
>   else {
>      // if no assigned partitions block until we get at least one
>      // instead of hot spinning this loop. We rely on a fact that
>      // unassignedPartitionsQueue will be closed on a shutdown, so
>      // we don't block indefinitely
>      newPartitions = unassignedPartitionsQueue.getBatchBlocking();
>   }
>   if (newPartitions != null) {
>      reassignPartitions(newPartitions);
>   }
>} catch (AbortedReassignmentException e) {
>   continue;
>}
>
>我们先假设我们设置的并行度和kafka partition数是相等情况下来看以下问题,正常情况应该是一个subtask对应消费一个partition,并关注新程序首次运行初始化时的情况
>
>问题1:KafkaConsumerThread这个线程是不是会运行在每个subtask里面?
>
>问题2:默认新程序初始化时应该会执行unassignedPartitionsQueue.getBatchBlocking(),这里一个subtask是不是有可能获取多个partition的情况?它是怎么确保只获取一个分区的?
>
>问题3:感觉我的理解有问题,能大概基于代码讲讲每个subtask确定消费哪个分区的流程吗?
>
>非常感谢!!
>
>Best,
>YunKillerE