You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "rupam (Jira)" <ji...@apache.org> on 2022/09/08 03:32:00 UTC

[jira] [Commented] (CAMEL-18327) camel-kafka: Kafka consumer closes when it is paused

    [ https://issues.apache.org/jira/browse/CAMEL-18327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601610#comment-17601610 ] 

rupam commented on CAMEL-18327:
-------------------------------

The method isKafkaConsumerRunnable() in do while loop should  not  check if it is in any suspended or suspending state. That will allow the consumer to continue try polling for messages, but since the consumer has been paused, the poll will return empty messages

> camel-kafka: Kafka consumer closes when it is paused
> ----------------------------------------------------
>
>                 Key: CAMEL-18327
>                 URL: https://issues.apache.org/jira/browse/CAMEL-18327
>             Project: Camel
>          Issue Type: Task
>          Components: camel-kafka
>    Affects Versions: 3.17.0
>            Reporter: rupam
>            Assignee: Otavio Rodolfo Piske
>            Priority: Major
>             Fix For: 3.18.3, 3.19.0
>
>
>  
> In the startPolling() method, if the consumer is suspending, then the unsubscribe method is called which permanently closes the consumer
>  
> {code:java}
>     Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
>     while (isKafkaConsumerRunnable() && isConnected() && pollExceptionStrategy.canContinue()) {
>         ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration);
>         if (consumerListener != null) {
>             if (!consumerListener.afterConsume(consumer)) {
>                 continue;
>             }
>         }
>         ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords);
>         if (result.isBreakOnErrorHit()) {
>             LOG.debug("We hit an error ... setting flags to force reconnect");
>             // force re-connect
>             setReconnect(true);
>             setConnected(false);
>         }
>         updateTaskState();
>     }
>     if (!isConnected()) {
>         LOG.debug("Not reconnecting, check whether to auto-commit or not ...");
>         commitManager.commit();
>     }
>     safeUnsubscribe();
> } catch (InterruptException e) {{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)