You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Srikant Mantha (Jira)" <ji...@apache.org> on 2019/11/05 09:34:00 UTC

[jira] [Commented] (CAMEL-14010) Camel-Kafka ConsumerCount drops to 1 (default) from the defined value

    [ https://issues.apache.org/jira/browse/CAMEL-14010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16967378#comment-16967378 ] 

Srikant Mantha commented on CAMEL-14010:
----------------------------------------

[~acosentino] [~davsclaus] The issue seem to be resolved after using Camel 3 version latest jar. Can you tell when would the feature release be available to use ?

> Camel-Kafka ConsumerCount drops to 1 (default) from the defined value
> ---------------------------------------------------------------------
>
>                 Key: CAMEL-14010
>                 URL: https://issues.apache.org/jira/browse/CAMEL-14010
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.22.0
>            Reporter: Srikant Mantha
>            Priority: Major
>
> Iam using kafka consumer in our application integrated with Camel. 
>  We consume the messages and send the data to the server for processing.
> There is one topic "*XYZ*" defined with *30* partitions and I have assigned *15* as consumer count on each consumer node (total 2 instances)
> /*** Camel Consumer Configuration ***/
> {code:java}
> kafka.consumersCount=15
>  kafka.consumerStreams=15{code}
> {{I see from the logs that when the consumer starts, there are *15* consumer threads (lets say on 1 node), which is good as configured.I see from the logs that when the consumer starts, there are 15 consumer threads (lets say on 1 node), which is good as configured.}}
>  
>  
> {code:java}
> {code}
> {{INFO  Camel (camel-1) thread #2 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 0 to topic XYZ }}
> {{INFO  Camel (camel-1) thread #3 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 1 to topic XYZ }}
> {{INFO  Camel (camel-1) thread #4 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 2 to topic XYZ }}
> {{INFO  Camel (camel-1) thread #5 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 3 to topic XYZ }}
> {{INFO  Camel (camel-1) thread #6 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 4 to topic XYZ }}
> {{INFO  Camel (camel-1) thread #7 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 5 to topic XYZ }}
> {{INFO  Camel (camel-1) thread #8 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 6 to topic XYZ }}
> {{INFO  Camel (camel-1) thread #9 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 7 to topic XYZ }}
> {{INFO  Camel (camel-1) thread #10 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 8 to topic XYZ }}
> {{INFO  Camel (camel-1) thread #11 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 9 to topic XYZ }}
> {{INFO  Camel (camel-1) thread #12 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 10 to topic XYZ }}
> {{INFO  Camel (camel-1) thread #13 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 11 to topic XYZ }}
> {{INFO  Camel (camel-1) thread #14 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 12 to topic XYZ }}
> {{INFO  Camel (camel-1) thread #15 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 13 to topic XYZ }}
> {{INFO  Camel (camel-1) thread #16 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 14 to topic XYZ}}
>  
> If server stops responding due to network issue or any other scenario when the server is unavailable, then all the kafka consumers starts unsubscribing which is again an expected behavior (so far good)
> Note: We have defined the Camel
> {code:java}
> ThrottlingExceptionRoutePolicy {code}
> which does a health check call on the server before sending the consumed message.
>  Once the server is back and available, *I see that not all 15 consumer threads are active*, but only 1 (*I guess this is the default value*). 
> From the logs below, I observe that the consumerss are getting subscribed and unsubscribed one by one from the topic and finally the application runs with only a single consumer count. This is really strange to see.
> {{INFO  Camel (camel-1) thread #17 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 0 to topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{auto.commit.interval.ms = 5000 }}
> {{auto.offset.reset = latest }}
> {{bootstrap.servers = [ListOfDefinedServers] }}
> {{check.crcs = true }}
> {{client.id =  }}
> {{connections.max.idle.ms = 540000 }}
> {{enable.auto.commit = false }}
> {{exclude.internal.topics = true }}
> {{fetch.max.bytes = 52428800 }}
> {{fetch.max.wait.ms = 500 }}
> {{fetch.min.bytes = 1 }}
> {{group.id = XYZ-GroupId-12345 }}
> {{heartbeat.interval.ms = 3000 }}
> {{interceptor.classes = null }}
> {{internal.leave.group.on.close = true }}
> {{isolation.level = read_uncommitted }}
> {{key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer }}
> {{max.partition.fetch.bytes = 1048576 }}
> {{max.poll.interval.ms = 300000 }}
> {{max.poll.records = 500 }}
> {{metadata.max.age.ms = 5000 }}
> {{metric.reporters = [] metrics.num.samples = 2 }}
> {{metrics.recording.level = INFO }}
> {{metrics.sample.window.ms = 30000}}
> {{partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] }}
> {{receive.buffer.bytes = 65536 }}
> {{reconnect.backoff.max.ms = 1000 }}
> {{reconnect.backoff.ms = 50 }}
> {{request.timeout.ms = 40000 }}
> {{retry.backoff.ms = 100 }}
> {{sasl.jaas.config = null }}
> {{sasl.kerberos.kinit.cmd = /usr/bin/kinit }}
> {{sasl.kerberos.min.time.before.relogin = 60000 }}
> {{sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = SSL send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = HTTPS ssl.key.password = [hidden] ssl.keymanager.algorithm = SunX509 ssl.keystore.location = cert.jks ssl.keystore.password = [hidden] ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = cert.jks ssl.truststore.password = [hidden] ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer }}
> {{INFO  Camel (camel-1) thread #17 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 0 from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO  Camel (camel-1) thread #18 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 1 to topic XYZ INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO  Camel (camel-1) thread #18 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 1 from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #19 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 2 to topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #19 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 2 from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #20 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 3 to topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #20 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 3 from topic XYZ}}
> INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined
> {{INFO  Camel (camel-1) thread #21 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 4 to topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #21 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 4 from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #22 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 5 to topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #22 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 5 from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #23 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 6 to topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #23 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 6 from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #24 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 7 to topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #24 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 7 from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #25 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 8 to topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #25 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 8 from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #26 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 9 to topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #26 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 9 from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #27 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 10 to topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #27 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 10 from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #28 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 11 to topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #28 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 11 from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #29 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 12 to topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #29 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 12 from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #30 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 13 to topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #30 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 13 from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 14 to topic XYZ INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Discovered group coordinator servername (id: 2147482644 rack: null) INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.ConsumerCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Revoking previously assigned partitions [] INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.AbstractCoordinator(info:336)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] (Re-)joining group INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Successfully joined group with generation 1 INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.ConsumerCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Setting newly assigned partitions [XYZ-17, XYZ-19, XYZ-13, XYZ-15, XYZ-25, XYZ-27, XYZ-21, XYZ-23, XYZ-1, XYZ-3, XYZ-28, XYZ-9, XYZ-11, XYZ-5, XYZ-7, XYZ-16, XYZ-18, XYZ-12, XYZ-14, XYZ-24, XYZ-26, XYZ-20, XYZ-22, XYZ-0, XYZ-2, XYZ-29, XYZ-8, XYZ-10, XYZ-4, XYZ-6]}}
>  
> How to fix this issue ?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)