You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Rafał Gała (JIRA)" <ji...@apache.org> on 2017/11/24 06:56:00 UTC
[jira] [Updated] (CAMEL-12031) KafkaConsumer stops consuming
messages when exception occurs during offset commit
[ https://issues.apache.org/jira/browse/CAMEL-12031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rafał Gała updated CAMEL-12031:
-------------------------------
Description:
When processing of messages takes longer than max session timeout, the consumer thread will end after receiving the *org.apache.kafka.clients.consumer.CommitFailedException*.
{code:java}
@Override
public void run() {
boolean first = true;
boolean reConnect = true;
while (reConnect) {
// create consumer
ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
try {
// Kafka uses reflection for loading authentication settings, use its classloader
Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
} finally {
Thread.currentThread().setContextClassLoader(threadClassLoader);
}
if (!first) {
// skip one poll timeout before trying again
long delay = endpoint.getConfiguration().getPollTimeoutMs();
log.info("Reconnecting {} to topic {} after {} ms", threadId, topicName, delay);
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
first = false;
// doRun keeps running until we either shutdown or is told to re-connect
reConnect = doRun();
}
}
{code}
The *doRun()* method returns false and the loop ends. It should be possible to let the proces continue after failed offset commit.
was:
When processing of messages takes longer than max session timeout, the consumer thread will and after receiving the *org.apache.kafka.clients.consumer.CommitFailedException*.
{code:java}
@Override
public void run() {
boolean first = true;
boolean reConnect = true;
while (reConnect) {
// create consumer
ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
try {
// Kafka uses reflection for loading authentication settings, use its classloader
Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
} finally {
Thread.currentThread().setContextClassLoader(threadClassLoader);
}
if (!first) {
// skip one poll timeout before trying again
long delay = endpoint.getConfiguration().getPollTimeoutMs();
log.info("Reconnecting {} to topic {} after {} ms", threadId, topicName, delay);
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
first = false;
// doRun keeps running until we either shutdown or is told to re-connect
reConnect = doRun();
}
}
{code}
The *doRun()* method returns false and the loop ends. It should be possible to let the proces continue after failed offset commit.
> KafkaConsumer stops consuming messages when exception occurs during offset commit
> ---------------------------------------------------------------------------------
>
> Key: CAMEL-12031
> URL: https://issues.apache.org/jira/browse/CAMEL-12031
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Reporter: Rafał Gała
>
> When processing of messages takes longer than max session timeout, the consumer thread will end after receiving the *org.apache.kafka.clients.consumer.CommitFailedException*.
> {code:java}
> @Override
> public void run() {
> boolean first = true;
> boolean reConnect = true;
> while (reConnect) {
> // create consumer
> ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
> try {
> // Kafka uses reflection for loading authentication settings, use its classloader
> Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
> this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
> } finally {
> Thread.currentThread().setContextClassLoader(threadClassLoader);
> }
> if (!first) {
> // skip one poll timeout before trying again
> long delay = endpoint.getConfiguration().getPollTimeoutMs();
> log.info("Reconnecting {} to topic {} after {} ms", threadId, topicName, delay);
> try {
> Thread.sleep(delay);
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> }
> }
> first = false;
> // doRun keeps running until we either shutdown or is told to re-connect
> reConnect = doRun();
> }
> }
> {code}
> The *doRun()* method returns false and the loop ends. It should be possible to let the proces continue after failed offset commit.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)