You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Paolo Patierno <pp...@live.com> on 2016/04/21 15:41:44 UTC

No error for assigning to not existing partition

Hello,

I have a topic with 4 partitions and I'm using the assign() method of KafkaConsumer for receiving from a specified partition.
For testing, I tried to specify a not existing partition (number 5) but I had a strange trace and behavior instead of an error/exception related to not existing partition.
After the assing() call I have the following ...
...
2016-04-21 15:31:26,355 [                      Thread-5] KafkaConsumer                  DEBUG Subscribed to partition(s): my_topic-5
...
and then a lot of retries to get partition information offset ...

2016-04-21 15:31:26,482 [                      Thread-5] ConsumerCoordinator            DEBUG No committed offset for partition my_topic-5
2016-04-21 15:31:26,482 [                      Thread-5] Fetcher                        DEBUG Resetting offset for partition my_topic-5 to latest offset.
2016-04-21 15:31:26,483 [                      Thread-5] Fetcher                        DEBUG Partition my_topic-5 is unknown for fetching offset, wait for metadata refresh
2016-04-21 15:31:26,571 [                      Thread-5] NetworkClient                  DEBUG Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=4,client_id=consumer-1}, body={topics=[my_topic]}), isInitiatedByNetworkClient, createdTimeMs=1461245486571, sendTimeMs=0) to node 0
2016-04-21 15:31:26,573 [                      Thread-5] Metadata                       DEBUG Updated cluster metadata version 3 to Cluster(nodes = [Node(0, localhost.localdomain, 9092)], partitions = [Partition(topic = my_topic, partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 3, leader = 0, replicas = [0,], isr = [0,]])
2016-04-21 15:31:26,573 [                      Thread-5] Fetcher                        DEBUG Partition my_topic-5 is unknown for fetching offset, wait for metadata refresh
2016-04-21 15:31:26,672 [                      Thread-5] NetworkClient                  DEBUG Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=5,client_id=consumer-1}, body={topics=[my_topic]}), isInitiatedByNetworkClient, createdTimeMs=1461245486672, sendTimeMs=0) to node 0
2016-04-21 15:31:26,674 [                      Thread-5] Metadata                       DEBUG Updated cluster metadata version 4 to Cluster(nodes = [Node(0, localhost.localdomain, 9092)], partitions = [Partition(topic = my_topic, partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 3, leader = 0, replicas = [0,], isr = [0,]])
2016-04-21 15:31:26,674 [                      Thread-5] Fetcher                        DEBUG Partition my_topic-5 is unknown for fetching offset, wait for metadata refresh
2016-04-21 15:31:26,773 [                      Thread-5] NetworkClient                  DEBUG Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=6,client_id=consumer-1}, body={topics=[my_topic]}), isInitiatedByNetworkClient, createdTimeMs=1461245486773, sendTimeMs=0) to node 0
2016-04-21 15:31:26,775 [                      Thread-5] Metadata                       DEBUG Updated cluster metadata version 5 to Cluster(nodes = [Node(0, localhost.localdomain, 9092)], partitions = [Partition(topic = my_topic, partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 3, leader = 0, replicas = [0,], isr = [0,]])
2016-04-21 15:31:26,775 [                      Thread-5] Fetcher                        DEBUG Partition my_topic-5 is unknown for fetching offset, wait for metadata refresh
2016-04-21 15:31:26,874 [                      Thread-5] NetworkClient                  DEBUG Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=7,client_id=consumer-1}, body={topics=[my_topic]}), isInitiatedByNetworkClient, createdTimeMs=1461245486874, sendTimeMs=0) to node 0
2016-04-21 15:31:26,876 [                      Thread-5] Metadata                       DEBUG Updated cluster metadata version 6 to Cluster(nodes = [Node(0, localhost.localdomain, 9092)], partitions = [Partition(topic = my_topic, partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 3, leader = 0, replicas = [0,], isr = [0,]])

Why don't an error is returned for a not existing partition ? 

Do I have to check by myself calling partitionsFor() method before trying the assign() ?

Thanks,

Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor 
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience