You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "rupam (Jira)" <ji...@apache.org> on 2022/09/18 23:13:00 UTC

[jira] [Reopened] (CAMEL-18327) camel-kafka: Kafka consumer closes when it is paused

     [ https://issues.apache.org/jira/browse/CAMEL-18327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

rupam reopened CAMEL-18327:
---------------------------
    Estimated Complexity: Novice  (was: Unknown)

The consumer.resume method resumes from the next available offset and doesn't attempt to send a message at the current offset

> camel-kafka: Kafka consumer closes when it is paused
> ----------------------------------------------------
>
>                 Key: CAMEL-18327
>                 URL: https://issues.apache.org/jira/browse/CAMEL-18327
>             Project: Camel
>          Issue Type: Task
>          Components: camel-kafka
>    Affects Versions: 3.17.0
>            Reporter: rupam
>            Assignee: Otavio Rodolfo Piske
>            Priority: Major
>             Fix For: 3.18.3, 3.19.0
>
>
>  
> In the startPolling() method, if the consumer is suspending, then the unsubscribe method is called which permanently closes the consumer
>  
> {code:java}
>     Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
>     while (isKafkaConsumerRunnable() && isConnected() && pollExceptionStrategy.canContinue()) {
>         ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration);
>         if (consumerListener != null) {
>             if (!consumerListener.afterConsume(consumer)) {
>                 continue;
>             }
>         }
>         ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords);
>         if (result.isBreakOnErrorHit()) {
>             LOG.debug("We hit an error ... setting flags to force reconnect");
>             // force re-connect
>             setReconnect(true);
>             setConnected(false);
>         }
>         updateTaskState();
>     }
>     if (!isConnected()) {
>         LOG.debug("Not reconnecting, check whether to auto-commit or not ...");
>         commitManager.commit();
>     }
>     safeUnsubscribe();
> } catch (InterruptException e) {{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)