You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by aitozi <gj...@gmail.com> on 2017/08/02 13:51:23 UTC
KafkaConsumerBase
Hi,
i have a question that , when we use KafkaConsumerBase, we will have to
fetch data from different partition
in different parllel thread like the method shown in
KafkaConsumerBase.java (version 1.2.0)
protected static List<KafkaTopicPartition> assignPartitions(
List<KafkaTopicPartition> allPartitions,
int numConsumers, int consumerIndex) {
final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>(
allPartitions.size() / numConsumers + 1);
for (int i = 0; i < allPartitions.size(); i++) {
if (i % numConsumers == consumerIndex) {
thisSubtaskPartitions.add(allPartitions.get(i));
}
}
return thisSubtaskPartitions;
}
but i have not find any place invoke this method , in
KafkaConsumerThread.java it used
consumerCallBridge.assignPartitions(consumer,
convertKafkaPartitions(subscribedPartitions));
i think here subscribedPartitions is all the partitions , not
subtaskPartitions. Can any one address my problem
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KafkaConsumerBase-tp14636.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: KafkaConsumerBase
Posted by aitozi <gj...@gmail.com>.
Hi,Gordon
Yes, just now i again read the code in assignTopicPartitions method , it
indeed subscribe the partition the subtask should subscribe to. i didn't
read the for loop generate subscribedPartitions for each subtasks in
assignTopicPartitions carefully before
for (int i = getRuntimeContext().getIndexOfThisSubtask(); i <
kafkaTopicPartitions.size(); i +=
getRuntimeContext().getNumberOfParallelSubtasks()) {
subscribedPartitions.add(kafkaTopicPartitions.get(i)); }
you ar right : "the partitions are still filtered out to only be the
partitions for each local subtask, using the `assignTopicPartitions` method"
Thanks
aitozi
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KafkaConsumerBase-tp14636p14642.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: KafkaConsumerBase
Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi!
method shown in KafkaConsumerBase.java (version 1.2.0)
A lot has changed in the FlinkKafkaConsumerBase since version 1.2.0.
And if I remember correctly, the `assignPartitions` method was actually a no longer relevant method used in the code, and was properly removed afterwards.
The method for partition assigning in 1.2.0 is called `assignTopicPartitions`, and is used in the open() method.
consumerCallBridge.assignPartitions(consumer,
convertKafkaPartitions(subscribedPartitions));
i think here subscribedPartitions is all the partitions , not
subtaskPartitions.
This code snippet is from `KafkaConsumerThread`, correct?
As stated above, the partitions are still filtered out to only be the partitions for each local subtask, using the `assignTopicPartitions` method. So here, the `subscribedPartitions` is not the complete list of partitions, only the partitions that the subtask should subscribe to.
Cheers,
Gordon
On 2 August 2017 at 9:52:03 PM, aitozi (gjying1314@gmail.com) wrote:
Hi,
i have a question that , when we use KafkaConsumerBase, we will have to
fetch data from different partition
in different parllel thread like the method shown in
KafkaConsumerBase.java (version 1.2.0)
protected static List<KafkaTopicPartition> assignPartitions(
List<KafkaTopicPartition> allPartitions,
int numConsumers, int consumerIndex) {
final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>(
allPartitions.size() / numConsumers + 1);
for (int i = 0; i < allPartitions.size(); i++) {
if (i % numConsumers == consumerIndex) {
thisSubtaskPartitions.add(allPartitions.get(i));
}
}
return thisSubtaskPartitions;
}
but i have not find any place invoke this method , in
KafkaConsumerThread.java it used
consumerCallBridge.assignPartitions(consumer,
convertKafkaPartitions(subscribedPartitions));
i think here subscribedPartitions is all the partitions , not
subtaskPartitions. Can any one address my problem
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KafkaConsumerBase-tp14636.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.