You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Rajib Deb <Ra...@infosys.com> on 2020/05/03 16:51:25 UTC
Kafka - FindCoordinator error
Hi
I have written a Python consumer using confluent-kafka package. After few hours of running the consumer is dying with the below error
cimpl.KafkaException:
KafkaError{code=_TIMED_OUT,val=-185,str="FindCoordinator response
error: Local: Timed out"}
Can anyone please help me understand why this is happening
**
Below is a portion of the code
**
producer_conf = {
'bootstrap.servers': 'xxxxxxx',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': 'xxxxx',
'sasl.password': 'xxxx',
'ssl.ca.location':'xxxx',
'ssl.certificate.location': 'xxxx',
'queue.buffering.max.messages': 100000,
'queue.buffering.max.ms' : 1000,
'batch.num.messages': 500
}
p = Producer(**producer_conf)
target_topic='xxxxxx'
c = Consumer(kwargs)
source_topic='xxxx'
c.subscribe([source_topic])
while True:
msg = c.poll(100) #I am consuming from a topic
if msg is None:
continue
if msg.error():
logging.error("error occurred during polling topic")
logging.error(msg.error())
raise KafkaException(msg.error())
continue
#logging.info("input msg form topic: ")
#logging.info(msg.value())
#msgDict = json.loads(msg.value()) # taking msg into dictionary
try:
p.produce(target_topic, msg.value(), callback=delivery_callback) #the message from the consumed topic is pushed to the target topic
c.commit() #disabled auto commit, manually committing only when message pushed to the target topic
except BufferError:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
len(p))
except Exception as e:
print(e)
p.poll(0)
#sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()
Thanks
Rajib
Re: Kafka - FindCoordinator error
Posted by Liam Clarke-Hutchinson <li...@adscale.co.nz>.
Hi Rajib,
We can't see the args you're passing the consumer, and the error message
indicates the consumer can't find the cluster.
Thanks,
Liam Clarke-Hutchinson
On Fri, 8 May 2020, 3:04 pm Rajib Deb, <Ra...@infosys.com> wrote:
> I wanted to check if anyone has faced this issue
>
> Thanks
> Rajib
>
> From: Rajib Deb
> Sent: Sunday, May 3, 2020 9:51 AM
> To: users@kafka.apache.org
> Subject: Kafka - FindCoordinator error
>
> Hi
> I have written a Python consumer using confluent-kafka package. After few
> hours of running the consumer is dying with the below error
>
> cimpl.KafkaException:
> KafkaError{code=_TIMED_OUT,val=-185,str="FindCoordinator response
> error: Local: Timed out"}
>
> Can anyone please help me understand why this is happening
> **
> Below is a portion of the code
> **
> producer_conf = {
> 'bootstrap.servers': 'xxxxxxx',
> 'security.protocol': 'SASL_SSL',
> 'sasl.mechanisms': 'PLAIN',
> 'sasl.username': 'xxxxx',
> 'sasl.password': 'xxxx',
> 'ssl.ca.location':'xxxx',
> 'ssl.certificate.location': 'xxxx',
> 'queue.buffering.max.messages': 100000,
> 'queue.buffering.max.ms' : 1000,
> 'batch.num.messages': 500
> }
>
> p = Producer(**producer_conf)
> target_topic='xxxxxx'
>
> c = Consumer(kwargs)
> source_topic='xxxx'
> c.subscribe([source_topic])
> while True:
>
> msg = c.poll(100) #I am consuming from a topic
>
> if msg is None:
> continue
> if msg.error():
> logging.error("error occurred during polling topic")
> logging.error(msg.error())
> raise KafkaException(msg.error())
> continue
>
> #logging.info("input msg form topic: ")
> #logging.info(msg.value())
> #msgDict = json.loads(msg.value()) # taking msg into dictionary
> try:
> p.produce(target_topic, msg.value(), callback=delivery_callback)
> #the message from the consumed topic is pushed to the target topic
> c.commit() #disabled auto commit, manually committing only when
> message pushed to the target topic
> except BufferError:
> sys.stderr.write('%% Local producer queue is full (%d messages
> awaiting delivery): try again\n' %
> len(p))
> except Exception as e:
> print(e)
>
> p.poll(0)
> #sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
> p.flush()
>
> Thanks
> Rajib
>
RE: Kafka - FindCoordinator error
Posted by Rajib Deb <Ra...@infosys.com>.
I wanted to check if anyone has faced this issue
Thanks
Rajib
From: Rajib Deb
Sent: Sunday, May 3, 2020 9:51 AM
To: users@kafka.apache.org
Subject: Kafka - FindCoordinator error
Hi
I have written a Python consumer using confluent-kafka package. After few hours of running the consumer is dying with the below error
cimpl.KafkaException:
KafkaError{code=_TIMED_OUT,val=-185,str="FindCoordinator response
error: Local: Timed out"}
Can anyone please help me understand why this is happening
**
Below is a portion of the code
**
producer_conf = {
'bootstrap.servers': 'xxxxxxx',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': 'xxxxx',
'sasl.password': 'xxxx',
'ssl.ca.location':'xxxx',
'ssl.certificate.location': 'xxxx',
'queue.buffering.max.messages': 100000,
'queue.buffering.max.ms' : 1000,
'batch.num.messages': 500
}
p = Producer(**producer_conf)
target_topic='xxxxxx'
c = Consumer(kwargs)
source_topic='xxxx'
c.subscribe([source_topic])
while True:
msg = c.poll(100) #I am consuming from a topic
if msg is None:
continue
if msg.error():
logging.error("error occurred during polling topic")
logging.error(msg.error())
raise KafkaException(msg.error())
continue
#logging.info("input msg form topic: ")
#logging.info(msg.value())
#msgDict = json.loads(msg.value()) # taking msg into dictionary
try:
p.produce(target_topic, msg.value(), callback=delivery_callback) #the message from the consumed topic is pushed to the target topic
c.commit() #disabled auto commit, manually committing only when message pushed to the target topic
except BufferError:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
len(p))
except Exception as e:
print(e)
p.poll(0)
#sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()
Thanks
Rajib