You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Michael J. Kitchin (JIRA)" <ji...@apache.org> on 2015/07/16 02:34:04 UTC

[jira] [Created] (CAMEL-8975) Message loss with batch commit

Michael J. Kitchin created CAMEL-8975:
-----------------------------------------

             Summary: Message loss with batch commit
                 Key: CAMEL-8975
                 URL: https://issues.apache.org/jira/browse/CAMEL-8975
             Project: Camel
          Issue Type: Bug
          Components: camel-kafka
    Affects Versions: 2.15.2
         Environment: Unbuntu LTS 14.x, Java 7
            Reporter: Michael J. Kitchin


These issues center around Kafka consumer (KafaConsumer.java, line numberrs below):
# Exchange exceptions/failures ignored at process() (:148), meaning:
## Automatic offset commit on exchange failure (e.g., processor/endpoint exception)
## In-flight message loss on Camel context shutdown
# BatchCommitConsumerTask activations are unbalanced during periods of low activity, meaning:
## await() (:165) will timeout for active BatchCommitConsumerTask(s) when other consumer threads are binding on it.hasNext() (:145) (blocking call, despite no @throws)
## Any, previously-activated await()'ing thread will (a) get a TimeoutExeception, (b) loop, and (c) get a BrokenBarrierException on the next await() call and (d) exit
## Process will repeat until (a) all consumer stream threads have exited, (b) leaving consumer dead
## Aggravated if process() (:148) blocks (e.g., for delay/redelivery on the route)
# An ExecutorService is obtained from Camel to handle KafkaStreams with # of threads set to the consumerStreams param (:77). Since the # of KafkaStreams actually created is (consumersCount * consumerStreams) and executor runnables are indefinite loops, a random selection of streams will not be serviced if consumersCount>1.

Source code URL:
- https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java

We've troubleshot this extensively and reimplemented the KafkaConsumer class with params added to KafkaConfiguration to address these concerns and are happy to submit these back to the community, if interested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)