You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Claus Ibsen (JIRA)" <ji...@apache.org> on 2015/07/16 17:34:04 UTC

[jira] [Commented] (CAMEL-8975) Message loss with batch commit

    [ https://issues.apache.org/jira/browse/CAMEL-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14629894#comment-14629894 ] 

Claus Ibsen commented on CAMEL-8975:
------------------------------------

Yeah sure we love contributions. Nothing beats being battled tested in real life. So you are very much welcome to work on patch(es).
http://camel.apache.org/contributing

> Message loss with batch commit
> ------------------------------
>
>                 Key: CAMEL-8975
>                 URL: https://issues.apache.org/jira/browse/CAMEL-8975
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.15.2
>         Environment: Unbuntu LTS 14.x, Java 7
>            Reporter: Michael J. Kitchin
>
> These issues center around Kafka consumer (KafaConsumer.java, line numbers below):
> # Exchange exceptions/failures ignored at process() (:148), meaning:
> ## Automatic offset commit on exchange failure (e.g., processor/endpoint exception)
> ## In-flight exchange loss on Camel context/runtime shutdown (i.e., route interrupted -> exception suppressed -> offset committed)
> # BatchCommitConsumerTask activations are unbalanced during periods of low activity, meaning:
> ## await() (:165) will timeout for active BatchCommitConsumerTask(s) when other consumer threads are binding on it.hasNext() (:145) (blocking call, despite no @throws)
> ## Any, previously-activated await()'ing thread will (a) get a TimeoutExeception, (b) loop, and (c) get a BrokenBarrierException on the next await() call and (d) exit
> ## Process will repeat until (a) all consumer stream threads have exited, (b) leaving consumer dead
> ## Aggravated if process() (:148) blocks (e.g., for delay/redelivery on the route)
> # An ExecutorService is obtained from Camel to handle KafkaStreams with # of threads set to the consumerStreams param (:77). Since the # of KafkaStreams actually created is (consumersCount * consumerStreams) and executor runnables are indefinite loops, a random selection of streams will not be serviced if consumersCount>1.
> Source code URL:
> - https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
> We've troubleshot this extensively and reimplemented the KafkaConsumer class with params added to KafkaConfiguration to address these concerns and are happy to submit these back to the community, if interested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)