You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Otavio Rodolfo Piske (Jira)" <ji...@apache.org> on 2022/10/08 05:57:00 UTC
[jira] [Commented] (CAMEL-18327) camel-kafka: Kafka consumer closes when it is paused
[ https://issues.apache.org/jira/browse/CAMEL-18327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17614366#comment-17614366 ]
Otavio Rodolfo Piske commented on CAMEL-18327:
----------------------------------------------
We need to backport this one, so keeping it open until Monday, when I'll do it.
> camel-kafka: Kafka consumer closes when it is paused
> ----------------------------------------------------
>
> Key: CAMEL-18327
> URL: https://issues.apache.org/jira/browse/CAMEL-18327
> Project: Camel
> Issue Type: Task
> Components: camel-kafka
> Affects Versions: 3.17.0
> Reporter: rupam
> Assignee: Otavio Rodolfo Piske
> Priority: Major
> Fix For: 3.18.3, 3.20.0
>
>
>
> In the startPolling() method, if the consumer is suspending, then the unsubscribe method is called which permanently closes the consumer
>
> {code:java}
> Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
> while (isKafkaConsumerRunnable() && isConnected() && pollExceptionStrategy.canContinue()) {
> ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration);
> if (consumerListener != null) {
> if (!consumerListener.afterConsume(consumer)) {
> continue;
> }
> }
> ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords);
> if (result.isBreakOnErrorHit()) {
> LOG.debug("We hit an error ... setting flags to force reconnect");
> // force re-connect
> setReconnect(true);
> setConnected(false);
> }
> updateTaskState();
> }
> if (!isConnected()) {
> LOG.debug("Not reconnecting, check whether to auto-commit or not ...");
> commitManager.commit();
> }
> safeUnsubscribe();
> } catch (InterruptException e) {{code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)