You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Paolo Patierno <pp...@live.com> on 2016/04/21 15:41:44 UTC
No error for assigning to not existing partition
Hello,
I have a topic with 4 partitions and I'm using the assign() method of KafkaConsumer for receiving from a specified partition.
For testing, I tried to specify a not existing partition (number 5) but I had a strange trace and behavior instead of an error/exception related to not existing partition.
After the assing() call I have the following ...
...
2016-04-21 15:31:26,355 [ Thread-5] KafkaConsumer DEBUG Subscribed to partition(s): my_topic-5
...
and then a lot of retries to get partition information offset ...
2016-04-21 15:31:26,482 [ Thread-5] ConsumerCoordinator DEBUG No committed offset for partition my_topic-5
2016-04-21 15:31:26,482 [ Thread-5] Fetcher DEBUG Resetting offset for partition my_topic-5 to latest offset.
2016-04-21 15:31:26,483 [ Thread-5] Fetcher DEBUG Partition my_topic-5 is unknown for fetching offset, wait for metadata refresh
2016-04-21 15:31:26,571 [ Thread-5] NetworkClient DEBUG Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=4,client_id=consumer-1}, body={topics=[my_topic]}), isInitiatedByNetworkClient, createdTimeMs=1461245486571, sendTimeMs=0) to node 0
2016-04-21 15:31:26,573 [ Thread-5] Metadata DEBUG Updated cluster metadata version 3 to Cluster(nodes = [Node(0, localhost.localdomain, 9092)], partitions = [Partition(topic = my_topic, partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 3, leader = 0, replicas = [0,], isr = [0,]])
2016-04-21 15:31:26,573 [ Thread-5] Fetcher DEBUG Partition my_topic-5 is unknown for fetching offset, wait for metadata refresh
2016-04-21 15:31:26,672 [ Thread-5] NetworkClient DEBUG Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=5,client_id=consumer-1}, body={topics=[my_topic]}), isInitiatedByNetworkClient, createdTimeMs=1461245486672, sendTimeMs=0) to node 0
2016-04-21 15:31:26,674 [ Thread-5] Metadata DEBUG Updated cluster metadata version 4 to Cluster(nodes = [Node(0, localhost.localdomain, 9092)], partitions = [Partition(topic = my_topic, partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 3, leader = 0, replicas = [0,], isr = [0,]])
2016-04-21 15:31:26,674 [ Thread-5] Fetcher DEBUG Partition my_topic-5 is unknown for fetching offset, wait for metadata refresh
2016-04-21 15:31:26,773 [ Thread-5] NetworkClient DEBUG Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=6,client_id=consumer-1}, body={topics=[my_topic]}), isInitiatedByNetworkClient, createdTimeMs=1461245486773, sendTimeMs=0) to node 0
2016-04-21 15:31:26,775 [ Thread-5] Metadata DEBUG Updated cluster metadata version 5 to Cluster(nodes = [Node(0, localhost.localdomain, 9092)], partitions = [Partition(topic = my_topic, partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 3, leader = 0, replicas = [0,], isr = [0,]])
2016-04-21 15:31:26,775 [ Thread-5] Fetcher DEBUG Partition my_topic-5 is unknown for fetching offset, wait for metadata refresh
2016-04-21 15:31:26,874 [ Thread-5] NetworkClient DEBUG Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=7,client_id=consumer-1}, body={topics=[my_topic]}), isInitiatedByNetworkClient, createdTimeMs=1461245486874, sendTimeMs=0) to node 0
2016-04-21 15:31:26,876 [ Thread-5] Metadata DEBUG Updated cluster metadata version 6 to Cluster(nodes = [Node(0, localhost.localdomain, 9092)], partitions = [Partition(topic = my_topic, partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 1, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic = my_topic, partition = 3, leader = 0, replicas = [0,], isr = [0,]])
Why don't an error is returned for a not existing partition ?
Do I have to check by myself calling partitionsFor() method before trying the assign() ?
Thanks,
Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience