You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by SRIKANT MVS <sr...@gmail.com> on 2021/06/10 15:25:49 UTC

Re: Kafka consumer wont recover after WakeupException

Hi Team,
I upgraded the camel version to 3.9 and the kafka-client jar upgraded to
2.7.0. I could see that kafka consumers are able to resume from the
WakeupException and InterruptedException. However, I see a new issue. I see
the below error message that gets printed endlessly until the consumer
service is restarted manually.

2021-06-09 23:02:09.228+0000 ERROR Camel (camel-1) thread #1 -
KafkaConsumer[testTopicV1]
[camel.processor.errorhandler.DefaultErrorHandler(log:205)] Failed delivery
for (MessageId: *D3D1C394132DC66-0000000000000001 *on ExchangeId:
D3D1C394132DC66-0000000000000001). Exhausted after delivery attempt: 1
caught: java.lang.IllegalStateException: Consumer is not subscribed to any
topics or assigned any partitions

Message History (complete message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor
                                               Elapsed (ms)
[route1            ] [route1            ]
[from[kafka://testTopicV1?allowManualCommit=True&autoCommitEnable=False] [
   7]
[route1            ] [process3          ] [Processor@0x77c233af
                                             ] [         0]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------

java.lang.IllegalStateException: Consumer is not subscribed to any topics
or assigned any partitions
 at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218)
~[kafka-clients-2.7.0.jar!/:?]
 at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1161)
~[kafka-clients-2.7.0.jar!/:?]
 at
org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doPollRun(KafkaConsumer.java:345)
[camel-kafka-3.9.0.jar!/:3.9.0]
 at
org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:273)
[camel-kafka-3.9.0.jar!/:3.9.0]
 at
org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:238)
[camel-kafka-3.9.0.jar!/:3.9.0]
 at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
[?:?]
 at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?]
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
 at java.lang.Thread.run(Thread.java:829) [?:?]

Exception keeps appearing something like this:
1st occurance of the exception at time *23:00:34.195* prints
D3D1C394132DC66-0000000000000000
2nd occurrence happens at  *23:02:09.228*  prints
D3D1C394132DC66-000000000000000*1*
3rd occurrence happens at  *23:02:14.235*  prints
D3D1C394132DC66-000000000000000*2*
4th occurrence happens at  *23:02:19.243*  prints
D3D1C394132DC66-000000000000000*3*

All these are printed by the same thread which was initially processing the
original request before the exception started.

However, when I checked the logs, the consumer threads are getting
subscribed back to the topic.

Not sure why this message is not processed and errors out endlessly. Any
inputs are appreciated.

-Regards
Srikant Mantha





On Thu, Apr 1, 2021 at 6:44 PM SRIKANT MVS <sr...@gmail.com> wrote:

> Hi Andrea/Claus,
> Thanks for the quick response. I am seeing this issue from camel 3.1.0,
> also migration to camel 3.4.5 (java 8 support) and kafka client upgraded to
> 2.5.0. Still the same issue is seen.
>
> Let me also give a try upgrading to 3.9.0.
>
> -Regards
> Srikant Mantha
>
> On Thu, Apr 1, 2021 at 6:37 PM Andrea Cosentino <an...@gmail.com> wrote:
>
>> Please try with an LTS version 3.7.x. 3.2.0 was a development version.
>>
>> Il giorno gio 1 apr 2021 alle ore 18:30 SRIKANT MVS <
>> srikant.mvs@gmail.com>
>> ha scritto:
>>
>> > HI Team,
>> >
>> > I am using camel-kafka (version: 3.2.0) for consuming messages.
>> > Below is the flow
>> >
>> >    1. Kafka service consumes events from the topic
>> >    2. Make a call to the Server
>> >    3. When the server is not responding in 40ms, throw
>> >    ServerUnavailableException
>> >    4. Stopping Kafka consumer on the topic
>> >    5. Unsubscribing from the topic testTopicV1
>> >    6. *Error unsubscribing testTopicV1-Thread 0 from kafka topic
>> >    testTopicV1. Caused by:
>> [org.apache.kafka.common.errors.WakeupException
>> > -
>> >    null]*
>> >    7.
>> >
>> >
>> > *Also seen an InterruptedException with the message "Interrupted while
>> >    waiting for consumer heartbeat thread to close" Once this happens,
>> then
>> > the
>> >    Kafka consumer never recovers and I have to start the consumer
>> service
>> >    manually. Any idea why this happens and how to mitigate from this
>> issue
>> > ?
>> >    2021-01-31 05:48:23.143+0000 WARN Camel (camel-1) thread #6 -
>> >    KafkaConsumer[testTopicV1]
>> > [camel.component.kafka.KafkaConsumer(log:212)]
>> >    Error unsubscribing testTopicV1-Thread 0 from kafka topic
>> testTopicV1.
>> >    Caused by: [org.apache.kafka.common.errors.WakeupException - null]
>> >    2021-01-31 05:48:23.143+0000 INFO Camel (camel-1) thread #7 -
>> >    KafkaConsumer[testTopicV1]
>> > [camel.component.kafka.KafkaConsumer(doRun:397)]
>> >    Unsubscribing testTopicV1-Thread 1 from topic testTopicV1
>> >    org.apache.kafka.common.errors.WakeupException: null at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:937)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1473)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1431)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.commitOffset(KafkaConsumer.java:432)
>> >    ~[camel-kafka-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.onPartitionsRevoked(KafkaConsumer.java:455)
>> >    ~[camel-kafka-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1073)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:400)
>> >    [camel-kafka-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:214)
>> >    [camel-kafka-3.2.0.jar!/:3.2.0] at
>> >    java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>> >    [?:1.8.0_271] at java.util.concurrent.FutureTask.run(Unknown Source)
>> >    [?:1.8.0_271] at
>> > java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
>> >    Source) [?:1.8.0_271] at
>> >    java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>> >    [?:1.8.0_271] at java.lang.Thread.run(Unknown Source)
>> [?:1.8.0_271]Full
>> >    StackTrace 2021-01-31 05:48:13.138+0000 ERROR Camel (camel-1) thread
>> #7
>> > -
>> >    KafkaConsumer[testTopicV1]
>> >    [camel.processor.errorhandler.DefaultErrorHandler(log:203)] Failed
>> > delivery
>> >    for (MessageId: ID-TestId-1611647443253-0-25 on ExchangeId:
>> >    ID-TestId-1611647443253-0-25). Exhausted after delivery attempt: 1
>> > caught:
>> >    com.example.exception.ServerUnavailableException: RestClientException
>> >    during rest call Message History (complete message history is
>> disabled)
>> >
>> >
>> ---------------------------------------------------------------------------------------------------------------------------------------
>> >    RouteId ProcessorId Processor Elapsed (ms) [route1 ] [route1 ]
>> >
>> >
>> [from[kafka://testTopicV1?allowManualCommit=True&autoCommitEnable=False] [
>> >    40027] ... [route1 ] [to1 ] [bean:kafkaProcessor ] [ 0] Stacktrace
>> >
>> >
>> ---------------------------------------------------------------------------------------------------------------------------------------
>> >    com.example.exception.ServerUnavailableException: RestClientException
>> >    during rest call at
>> >    com.example.processor.KafkaProcessor.process(KafkaProcessor.java:46)
>> >    ~[classes!/:?] at
>> >
>> >
>> org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:117)
>> >    ~[camel-bean-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:56)
>> >    ~[camel-bean-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.camel.component.bean.BeanProducer.process(BeanProducer.java:41)
>> >    ~[camel-bean-3.2.0.jar!/:3.2.0] at
>> >
>> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:166)
>> >    ~[camel-base-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:702)
>> >    [camel-base-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:616)
>> >    [camel-base-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
>> >    [camel-base-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60)
>> >    [camel-base-3.2.0.jar!/:3.2.0] at
>> >    org.apache.camel.processor.Pipeline.process(Pipeline.java:147)
>> >    [camel-base-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:286)
>> >    [camel-base-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
>> >    [camel-base-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:40)
>> >    [camel-support-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:338)
>> >    [camel-kafka-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:214)
>> >    [camel-kafka-3.2.0.jar!/:3.2.0] at
>> >    java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>> >    [?:1.8.0_271] at java.util.concurrent.FutureTask.run(Unknown Source)
>> >    [?:1.8.0_271] at
>> > java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
>> >    Source) [?:1.8.0_271] at
>> >    java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>> >    [?:1.8.0_271] at java.lang.Thread.run(Unknown Source) [?:1.8.0_271]
>> > Caused
>> >    by: org.springframework.web.client.ResourceAccessException: I/O
>> error on
>> >    POST request for "http://xxxxxxx": Read timed out; nested exception
>> is
>> >    java.net.SocketTimeoutException: Read timed out at
>> >
>> >
>> org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:746)
>> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
>> >
>> >
>> org.springframework.web.client.RestTemplate.execute(RestTemplate.java:672)
>> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
>> >
>> >
>> org.springframework.web.client.RestTemplate.postForEntity(RestTemplate.java:447)
>> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] ... 21 more Caused
>> by:
>> >    java.net.SocketTimeoutException: Read timed out at
>> >    java.net.SocketInputStream.socketRead0(Native Method) ~[?:1.8.0_271]
>> at
>> >    java.net.SocketInputStream.socketRead(Unknown Source) ~[?:1.8.0_271]
>> at
>> >    java.net.SocketInputStream.read(Unknown Source) ~[?:1.8.0_271] at
>> >    java.net.SocketInputStream.read(Unknown Source) ~[?:1.8.0_271] at
>> >    org.apache.http.impl.io
>> > .SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
>> >    ~[httpcore-4.4.13.jar!/:4.4.13] at
>> >    org.apache.http.impl.io
>> > .SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)
>> >    ~[httpcore-4.4.13.jar!/:4.4.13] at
>> >    org.apache.http.impl.io
>> > .SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280)
>> >    ~[httpcore-4.4.13.jar!/:4.4.13] at
>> >
>> >
>> org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)
>> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
>> >
>> >
>> org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
>> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
>> >    org.apache.http.impl.io
>> > .AbstractMessageParser.parse(AbstractMessageParser.java:259)
>> >    ~[httpcore-4.4.13.jar!/:4.4.13] at
>> >
>> >
>> org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163)
>> >    ~[httpcore-4.4.13.jar!/:4.4.13] at
>> >
>> >
>> org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157)
>> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
>> >
>> >
>> org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)
>> >    ~[httpcore-4.4.13.jar!/:4.4.13] at
>> >
>> >
>> org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
>> >    ~[httpcore-4.4.13.jar!/:4.4.13] at
>> >
>> >
>> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
>> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
>> >
>> >
>> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
>> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
>> >    org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
>> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
>> >
>> >
>> org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
>> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
>> >
>> >
>> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
>> >
>> >
>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
>> >
>> >
>> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
>> >
>> >
>> org.springframework.http.client.HttpComponentsClientHttpRequest.executeInternal(HttpComponentsClientHttpRequest.java:87)
>> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
>> >
>> >
>> org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48)
>> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
>> >
>> >
>> org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53)
>> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
>> >
>> >
>> org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:109)
>> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
>> >
>> >
>> org.springframework.boot.actuate.metrics.web.client.MetricsClientHttpRequestInterceptor.intercept(MetricsClientHttpRequestInterceptor.java:95)
>> >    ~[spring-boot-actuator-2.2.11.RELEASE.jar!/:2.2.11.RELEASE] at
>> >
>> >
>> org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:93)
>> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
>> >
>> >
>> org.springframework.http.client.InterceptingClientHttpRequest.executeInternal(InterceptingClientHttpRequest.java:77)
>> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
>> >
>> >
>> org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48)
>> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
>> >
>> >
>> org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53)
>> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
>> >
>> >
>> org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:737)
>> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
>> >
>> >
>> org.springframework.web.client.RestTemplate.execute(RestTemplate.java:672)
>> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
>> >
>> >
>> org.springframework.web.client.RestTemplate.postForEntity(RestTemplate.java:447)
>> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] ... 21 more
>> 2021-01-31
>> >    05:48:13.139+0000 INFO Camel (camel-1) thread #7 -
>> >    KafkaConsumer[testTopicV1]
>> >    [camel.component.kafka.KafkaConsumer(doStop:141)] Stopping Kafka
>> > consumer
>> >    on topic: testTopicV1 2021-01-31 05:48:15.140+0000 INFO Camel
>> (camel-1)
>> >    thread #7 - KafkaConsumer[testTopicV1]
>> >    [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)]
>> > Waited
>> >    2.000 seconds for ExecutorService:
>> >
>> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0
>> > [Shutting
>> >    down, pool size = 2, active threads = 2, queued tasks = 0, completed
>> > tasks
>> >    = 0][KafkaConsumer[testTopicV1]] to terminate... 2021-01-31
>> >    05:48:17.141+0000 INFO Camel (camel-1) thread #7 -
>> >    KafkaConsumer[testTopicV1]
>> >    [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)]
>> > Waited
>> >    4.001 seconds for ExecutorService:
>> >
>> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0
>> > [Shutting
>> >    down, pool size = 2, active threads = 2, queued tasks = 0, completed
>> > tasks
>> >    = 0][KafkaConsumer[testTopicV1]] to terminate... 2021-01-31
>> >    05:48:19.142+0000 INFO Camel (camel-1) thread #7 -
>> >    KafkaConsumer[testTopicV1]
>> >    [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)]
>> > Waited
>> >    6.001 seconds for ExecutorService:
>> >
>> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0
>> > [Shutting
>> >    down, pool size = 2, active threads = 2, queued tasks = 0, completed
>> > tasks
>> >    = 0][KafkaConsumer[testTopicV1]] to terminate... 2021-01-31
>> >    05:48:21.147+0000 INFO Camel (camel-1) thread #7 -
>> >    KafkaConsumer[testTopicV1]
>> >    [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)]
>> > Waited
>> >    8.007 seconds for ExecutorService:
>> >
>> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0
>> > [Shutting
>> >    down, pool size = 2, active threads = 2, queued tasks = 0, completed
>> > tasks
>> >    = 0][KafkaConsumer[testTopicV1]] to terminate... 2021-01-31
>> >    05:48:23.141+0000 INFO Camel (camel-1) thread #7 -
>> >    KafkaConsumer[testTopicV1]
>> >    [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)]
>> > Waited
>> >    10.000 seconds for ExecutorService:
>> >
>> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0
>> > [Shutting
>> >    down, pool size = 2, active threads = 2, queued tasks = 0, completed
>> > tasks
>> >    = 0][KafkaConsumer[testTopicV1]] to terminate... 2021-01-31
>> >    05:48:23.141+0000 WARN Camel (camel-1) thread #7 -
>> >    KafkaConsumer[testTopicV1]
>> >    [camel.impl.engine.BaseExecutorServiceManager(doShutdown:305)]
>> Forcing
>> >    shutdown of ExecutorService:
>> >
>> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0
>> > [Shutting
>> >    down, pool size = 2, active threads = 2, queued tasks = 0, completed
>> > tasks
>> >    = 0][KafkaConsumer[testTopicV1]] due first await termination elapsed.
>> >    2021-01-31 05:48:23.141+0000 WARN Camel (camel-1) thread #7 -
>> >    KafkaConsumer[testTopicV1]
>> >    [camel.impl.engine.BaseExecutorServiceManager(doShutdown:314)]
>> Forcing
>> >    shutdown of ExecutorService:
>> >
>> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0
>> > [Shutting
>> >    down, pool size = 2, active threads = 2, queued tasks = 0, completed
>> > tasks
>> >    = 0][KafkaConsumer[testTopicV1]] due interrupted. 2021-01-31
>> >    05:48:23.141+0000 INFO Camel (camel-1) thread #7 -
>> >    KafkaConsumer[testTopicV1]
>> >    [camel.impl.engine.BaseExecutorServiceManager(doShutdown:322)]
>> Shutdown
>> > of
>> >    ExecutorService:
>> >
>> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0
>> > [Shutting
>> >    down, pool size = 2, active threads = 2, queued tasks = 0, completed
>> > tasks
>> >    = 0][KafkaConsumer[testTopicV1]] is shutdown: true and terminated:
>> false
>> >    took: 10.001 seconds. 2021-01-31 05:48:23.142+0000 INFO Camel
>> (camel-1)
>> >    thread #6 - KafkaConsumer[testTopicV1]
>> >    [camel.component.kafka.KafkaConsumer(doRun:397)] Unsubscribing
>> >    testTopicV1-Thread 0 from topic testTopicV1 2021-01-31
>> 05:48:23.143+0000
>> >    INFO Camel (camel-1) thread #6 - KafkaConsumer[testTopicV1]
>> >
>> >
>> [clients.consumer.internals.ConsumerCoordinator(invokePartitionsRevoked:286)]
>> >    [Consumer clientId=consumer-TestService-5, groupId=TestService]
>> Revoke
>> >    previously assigned partitions testTopicV1-13, testTopicV1-12,
>> >    testTopicV1-14, testTopicV1-11, testTopicV1-10 2021-01-31
>> > 05:48:23.142+0000
>> >    WARN Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1]
>> >    [camel.component.kafka.KafkaConsumer(log:212)] Error during
>> processing.
>> >    Exchange[ID-TestId-1611647443253-0-25]. Caused by:
>> >    [ServerUnavailableException - RestClientException during rest call]
>> >    ServerUnavailableException: RestClientException during rest call
>> > 2021-01-31
>> >    05:48:23.143+0000 INFO kafka-coordinator-heartbeat-thread |
>> TestService
>> >
>> >
>> [clients.consumer.internals.AbstractCoordinator(markCoordinatorUnknown:808)]
>> >    [Consumer clientId=consumer-TestService-5, groupId=TestService] Group
>> >    coordinator KafkaBroker3:9093 (id: 2147482641 rack: null) is
>> > unavailable or
>> >    invalid, will attempt rediscovery 2021-01-31 05:48:23.143+0000 WARN
>> > Camel
>> >    (camel-1) thread #6 - KafkaConsumer[testTopicV1]
>> >    [camel.component.kafka.KafkaConsumer(log:212)] Error unsubscribing
>> >    testTopicV1-Thread 0 from kafka topic testTopicV1. Caused by:
>> >    [org.apache.kafka.common.errors.WakeupException - null] 2021-01-31
>> >    05:48:23.143+0000 INFO Camel (camel-1) thread #7 -
>> >    KafkaConsumer[testTopicV1]
>> > [camel.component.kafka.KafkaConsumer(doRun:397)]
>> >    Unsubscribing testTopicV1-Thread 1 from topic testTopicV1
>> >    org.apache.kafka.common.errors.WakeupException: null at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:937)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1473)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1431)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.commitOffset(KafkaConsumer.java:432)
>> >    ~[camel-kafka-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.onPartitionsRevoked(KafkaConsumer.java:455)
>> >    ~[camel-kafka-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1073)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:400)
>> >    [camel-kafka-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:214)
>> >    [camel-kafka-3.2.0.jar!/:3.2.0] at
>> >    java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>> >    [?:1.8.0_271] at java.util.concurrent.FutureTask.run(Unknown Source)
>> >    [?:1.8.0_271] at
>> > java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
>> >    Source) [?:1.8.0_271] at
>> >    java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>> >    [?:1.8.0_271] at java.lang.Thread.run(Unknown Source) [?:1.8.0_271]
>> >    2021-01-31 05:48:23.143+0000 INFO Camel (camel-1) thread #7 -
>> >    KafkaConsumer[testTopicV1]
>> >
>> >
>> [clients.consumer.internals.ConsumerCoordinator(invokePartitionsRevoked:286)]
>> >    [Consumer clientId=consumer-TestService-6, groupId=TestService]
>> Revoke
>> >    previously assigned partitions testTopicV1-24, testTopicV1-21,
>> >    testTopicV1-20, testTopicV1-23, testTopicV1-22 2021-01-31
>> > 05:48:23.143+0000
>> >    WARN Camel (camel-1) thread #6 - KafkaConsumer[testTopicV1]
>> >
>> >
>> [clients.consumer.internals.AbstractCoordinator(closeHeartbeatThread:367)]
>> >    [Consumer clientId=consumer-TestService-5, groupId=TestService]
>> > Interrupted
>> >    while waiting for consumer heartbeat thread to close 2021-01-31
>> >    05:48:23.144+0000 INFO Camel (camel-1) thread #6 -
>> >    KafkaConsumer[testTopicV1]
>> >
>> >
>> [clients.consumer.internals.ConsumerCoordinator(invokePartitionsRevoked:286)]
>> >    [Consumer clientId=consumer-TestService-5, groupId=TestService]
>> Revoke
>> >    previously assigned partitions testTopicV1-13, testTopicV1-12,
>> >    testTopicV1-14, testTopicV1-11, testTopicV1-10 2021-01-31
>> > 05:48:23.144+0000
>> >    WARN Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1]
>> >    [camel.component.kafka.KafkaConsumer(log:212)] Error unsubscribing
>> >    testTopicV1-Thread 1 from kafka topic testTopicV1. Caused by:
>> >    [org.apache.kafka.common.errors.WakeupException - null]
>> >    org.apache.kafka.common.errors.WakeupException: null .... 2021-01-31
>> >    05:48:23.144+0000 WARN Camel (camel-1) thread #7 -
>> >    KafkaConsumer[testTopicV1]
>> >
>> >
>> [clients.consumer.internals.AbstractCoordinator(closeHeartbeatThread:367)]
>> >    [Consumer clientId=consumer-TestService-6, groupId=TestService]
>> > Interrupted
>> >    while waiting for consumer heartbeat thread to close 2021-01-31
>> >    05:48:23.144+0000 INFO Camel (camel-1) thread #7 -
>> >    KafkaConsumer[testTopicV1]
>> >
>> >
>> [clients.consumer.internals.ConsumerCoordinator(invokePartitionsRevoked:286)]
>> >    [Consumer clientId=consumer-TestService-6, groupId=TestService]
>> Revoke
>> >    previously assigned partitions testTopicV1-24, testTopicV1-21,
>> >    testTopicV1-20, testTopicV1-23, testTopicV1-22 2021-01-31
>> > 05:48:23.144+0000
>> >    ERROR Camel (camel-1) thread #6 - KafkaConsumer[testTopicV1]
>> >    [kafka.clients.consumer.KafkaConsumer(close:2297)] [Consumer
>> >    clientId=consumer-TestService-5, groupId=TestService] Failed to close
>> >    coordinator org.apache.kafka.common.errors.InterruptException:
>> >    java.lang.InterruptedException at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:517)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:278)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:932)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1473)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1431)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.commitOffset(KafkaConsumer.java:432)
>> >    ~[camel-kafka-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.onPartitionsRevoked(KafkaConsumer.java:455)
>> >    ~[camel-kafka-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:887)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:832)
>> >    ~[kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2294)
>> >    [kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2261)
>> >    [kafka-clients-2.4.0.jar!/:?] at
>> >
>> >
>> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2211)
>> >    [kafka-clients-2.4.0.jar!/:?] at
>> >    org.apache.camel.util.IOHelper.close(IOHelper.java:341)
>> >    [camel-util-3.2.0.jar!/:3.2.0] at
>> >    org.apache.camel.util.IOHelper.close(IOHelper.java:403)
>> >    [camel-util-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:419)
>> >    [camel-kafka-3.2.0.jar!/:3.2.0] at
>> >
>> >
>> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:214)
>> >    [camel-kafka-3.2.0.jar!/:3.2.0] at
>> >    java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>> >    [?:1.8.0_271] at java.util.concurrent.FutureTask.run(Unknown Source)
>> >    [?:1.8.0_271] at
>> > java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
>> >    Source) [?:1.8.0_271] at
>> >    java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>> >    [?:1.8.0_271] at java.lang.Thread.run(Unknown Source) [?:1.8.0_271]
>> > Caused
>> >    by: java.lang.InterruptedException ... 26 more *
>> >
>> > *-RegardsSrikant Mantha*
>> >
>>
>

Re: Kafka consumer wont recover after WakeupException

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Can you try with Camel 3.11 (on the way) and Kafka 2.8 clients.

On Thu, Jun 10, 2021 at 5:26 PM SRIKANT MVS <sr...@gmail.com> wrote:
>
> Hi Team,
> I upgraded the camel version to 3.9 and the kafka-client jar upgraded to
> 2.7.0. I could see that kafka consumers are able to resume from the
> WakeupException and InterruptedException. However, I see a new issue. I see
> the below error message that gets printed endlessly until the consumer
> service is restarted manually.
>
> 2021-06-09 23:02:09.228+0000 ERROR Camel (camel-1) thread #1 -
> KafkaConsumer[testTopicV1]
> [camel.processor.errorhandler.DefaultErrorHandler(log:205)] Failed delivery
> for (MessageId: *D3D1C394132DC66-0000000000000001 *on ExchangeId:
> D3D1C394132DC66-0000000000000001). Exhausted after delivery attempt: 1
> caught: java.lang.IllegalStateException: Consumer is not subscribed to any
> topics or assigned any partitions
>
> Message History (complete message history is disabled)
> ---------------------------------------------------------------------------------------------------------------------------------------
> RouteId              ProcessorId          Processor
>                                                Elapsed (ms)
> [route1            ] [route1            ]
> [from[kafka://testTopicV1?allowManualCommit=True&autoCommitEnable=False] [
>    7]
> [route1            ] [process3          ] [Processor@0x77c233af
>                                              ] [         0]
>
> Stacktrace
> ---------------------------------------------------------------------------------------------------------------------------------------
>
> java.lang.IllegalStateException: Consumer is not subscribed to any topics
> or assigned any partitions
>  at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218)
> ~[kafka-clients-2.7.0.jar!/:?]
>  at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1161)
> ~[kafka-clients-2.7.0.jar!/:?]
>  at
> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doPollRun(KafkaConsumer.java:345)
> [camel-kafka-3.9.0.jar!/:3.9.0]
>  at
> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:273)
> [camel-kafka-3.9.0.jar!/:3.9.0]
>  at
> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:238)
> [camel-kafka-3.9.0.jar!/:3.9.0]
>  at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> [?:?]
>  at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
>  at java.lang.Thread.run(Thread.java:829) [?:?]
>
> Exception keeps appearing something like this:
> 1st occurance of the exception at time *23:00:34.195* prints
> D3D1C394132DC66-0000000000000000
> 2nd occurrence happens at  *23:02:09.228*  prints
> D3D1C394132DC66-000000000000000*1*
> 3rd occurrence happens at  *23:02:14.235*  prints
> D3D1C394132DC66-000000000000000*2*
> 4th occurrence happens at  *23:02:19.243*  prints
> D3D1C394132DC66-000000000000000*3*
>
> All these are printed by the same thread which was initially processing the
> original request before the exception started.
>
> However, when I checked the logs, the consumer threads are getting
> subscribed back to the topic.
>
> Not sure why this message is not processed and errors out endlessly. Any
> inputs are appreciated.
>
> -Regards
> Srikant Mantha
>
>
>
>
>
> On Thu, Apr 1, 2021 at 6:44 PM SRIKANT MVS <sr...@gmail.com> wrote:
>
> > Hi Andrea/Claus,
> > Thanks for the quick response. I am seeing this issue from camel 3.1.0,
> > also migration to camel 3.4.5 (java 8 support) and kafka client upgraded to
> > 2.5.0. Still the same issue is seen.
> >
> > Let me also give a try upgrading to 3.9.0.
> >
> > -Regards
> > Srikant Mantha
> >
> > On Thu, Apr 1, 2021 at 6:37 PM Andrea Cosentino <an...@gmail.com> wrote:
> >
> >> Please try with an LTS version 3.7.x. 3.2.0 was a development version.
> >>
> >> Il giorno gio 1 apr 2021 alle ore 18:30 SRIKANT MVS <
> >> srikant.mvs@gmail.com>
> >> ha scritto:
> >>
> >> > HI Team,
> >> >
> >> > I am using camel-kafka (version: 3.2.0) for consuming messages.
> >> > Below is the flow
> >> >
> >> >    1. Kafka service consumes events from the topic
> >> >    2. Make a call to the Server
> >> >    3. When the server is not responding in 40ms, throw
> >> >    ServerUnavailableException
> >> >    4. Stopping Kafka consumer on the topic
> >> >    5. Unsubscribing from the topic testTopicV1
> >> >    6. *Error unsubscribing testTopicV1-Thread 0 from kafka topic
> >> >    testTopicV1. Caused by:
> >> [org.apache.kafka.common.errors.WakeupException
> >> > -
> >> >    null]*
> >> >    7.
> >> >
> >> >
> >> > *Also seen an InterruptedException with the message "Interrupted while
> >> >    waiting for consumer heartbeat thread to close" Once this happens,
> >> then
> >> > the
> >> >    Kafka consumer never recovers and I have to start the consumer
> >> service
> >> >    manually. Any idea why this happens and how to mitigate from this
> >> issue
> >> > ?
> >> >    2021-01-31 05:48:23.143+0000 WARN Camel (camel-1) thread #6 -
> >> >    KafkaConsumer[testTopicV1]
> >> > [camel.component.kafka.KafkaConsumer(log:212)]
> >> >    Error unsubscribing testTopicV1-Thread 0 from kafka topic
> >> testTopicV1.
> >> >    Caused by: [org.apache.kafka.common.errors.WakeupException - null]
> >> >    2021-01-31 05:48:23.143+0000 INFO Camel (camel-1) thread #7 -
> >> >    KafkaConsumer[testTopicV1]
> >> > [camel.component.kafka.KafkaConsumer(doRun:397)]
> >> >    Unsubscribing testTopicV1-Thread 1 from topic testTopicV1
> >> >    org.apache.kafka.common.errors.WakeupException: null at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:937)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1473)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1431)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.commitOffset(KafkaConsumer.java:432)
> >> >    ~[camel-kafka-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.onPartitionsRevoked(KafkaConsumer.java:455)
> >> >    ~[camel-kafka-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1073)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:400)
> >> >    [camel-kafka-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:214)
> >> >    [camel-kafka-3.2.0.jar!/:3.2.0] at
> >> >    java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> >> >    [?:1.8.0_271] at java.util.concurrent.FutureTask.run(Unknown Source)
> >> >    [?:1.8.0_271] at
> >> > java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> >> >    Source) [?:1.8.0_271] at
> >> >    java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> >> >    [?:1.8.0_271] at java.lang.Thread.run(Unknown Source)
> >> [?:1.8.0_271]Full
> >> >    StackTrace 2021-01-31 05:48:13.138+0000 ERROR Camel (camel-1) thread
> >> #7
> >> > -
> >> >    KafkaConsumer[testTopicV1]
> >> >    [camel.processor.errorhandler.DefaultErrorHandler(log:203)] Failed
> >> > delivery
> >> >    for (MessageId: ID-TestId-1611647443253-0-25 on ExchangeId:
> >> >    ID-TestId-1611647443253-0-25). Exhausted after delivery attempt: 1
> >> > caught:
> >> >    com.example.exception.ServerUnavailableException: RestClientException
> >> >    during rest call Message History (complete message history is
> >> disabled)
> >> >
> >> >
> >> ---------------------------------------------------------------------------------------------------------------------------------------
> >> >    RouteId ProcessorId Processor Elapsed (ms) [route1 ] [route1 ]
> >> >
> >> >
> >> [from[kafka://testTopicV1?allowManualCommit=True&autoCommitEnable=False] [
> >> >    40027] ... [route1 ] [to1 ] [bean:kafkaProcessor ] [ 0] Stacktrace
> >> >
> >> >
> >> ---------------------------------------------------------------------------------------------------------------------------------------
> >> >    com.example.exception.ServerUnavailableException: RestClientException
> >> >    during rest call at
> >> >    com.example.processor.KafkaProcessor.process(KafkaProcessor.java:46)
> >> >    ~[classes!/:?] at
> >> >
> >> >
> >> org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:117)
> >> >    ~[camel-bean-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:56)
> >> >    ~[camel-bean-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.camel.component.bean.BeanProducer.process(BeanProducer.java:41)
> >> >    ~[camel-bean-3.2.0.jar!/:3.2.0] at
> >> >
> >> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:166)
> >> >    ~[camel-base-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:702)
> >> >    [camel-base-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:616)
> >> >    [camel-base-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
> >> >    [camel-base-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60)
> >> >    [camel-base-3.2.0.jar!/:3.2.0] at
> >> >    org.apache.camel.processor.Pipeline.process(Pipeline.java:147)
> >> >    [camel-base-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:286)
> >> >    [camel-base-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
> >> >    [camel-base-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:40)
> >> >    [camel-support-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:338)
> >> >    [camel-kafka-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:214)
> >> >    [camel-kafka-3.2.0.jar!/:3.2.0] at
> >> >    java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> >> >    [?:1.8.0_271] at java.util.concurrent.FutureTask.run(Unknown Source)
> >> >    [?:1.8.0_271] at
> >> > java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> >> >    Source) [?:1.8.0_271] at
> >> >    java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> >> >    [?:1.8.0_271] at java.lang.Thread.run(Unknown Source) [?:1.8.0_271]
> >> > Caused
> >> >    by: org.springframework.web.client.ResourceAccessException: I/O
> >> error on
> >> >    POST request for "http://xxxxxxx": Read timed out; nested exception
> >> is
> >> >    java.net.SocketTimeoutException: Read timed out at
> >> >
> >> >
> >> org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:746)
> >> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
> >> >
> >> >
> >> org.springframework.web.client.RestTemplate.execute(RestTemplate.java:672)
> >> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
> >> >
> >> >
> >> org.springframework.web.client.RestTemplate.postForEntity(RestTemplate.java:447)
> >> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] ... 21 more Caused
> >> by:
> >> >    java.net.SocketTimeoutException: Read timed out at
> >> >    java.net.SocketInputStream.socketRead0(Native Method) ~[?:1.8.0_271]
> >> at
> >> >    java.net.SocketInputStream.socketRead(Unknown Source) ~[?:1.8.0_271]
> >> at
> >> >    java.net.SocketInputStream.read(Unknown Source) ~[?:1.8.0_271] at
> >> >    java.net.SocketInputStream.read(Unknown Source) ~[?:1.8.0_271] at
> >> >    org.apache.http.impl.io
> >> > .SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
> >> >    ~[httpcore-4.4.13.jar!/:4.4.13] at
> >> >    org.apache.http.impl.io
> >> > .SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)
> >> >    ~[httpcore-4.4.13.jar!/:4.4.13] at
> >> >    org.apache.http.impl.io
> >> > .SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280)
> >> >    ~[httpcore-4.4.13.jar!/:4.4.13] at
> >> >
> >> >
> >> org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)
> >> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
> >> >
> >> >
> >> org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
> >> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
> >> >    org.apache.http.impl.io
> >> > .AbstractMessageParser.parse(AbstractMessageParser.java:259)
> >> >    ~[httpcore-4.4.13.jar!/:4.4.13] at
> >> >
> >> >
> >> org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163)
> >> >    ~[httpcore-4.4.13.jar!/:4.4.13] at
> >> >
> >> >
> >> org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157)
> >> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
> >> >
> >> >
> >> org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)
> >> >    ~[httpcore-4.4.13.jar!/:4.4.13] at
> >> >
> >> >
> >> org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
> >> >    ~[httpcore-4.4.13.jar!/:4.4.13] at
> >> >
> >> >
> >> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
> >> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
> >> >
> >> >
> >> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
> >> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
> >> >    org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
> >> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
> >> >
> >> >
> >> org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
> >> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
> >> >
> >> >
> >> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
> >> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
> >> >
> >> >
> >> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
> >> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
> >> >
> >> >
> >> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
> >> >    ~[httpclient-4.5.12.jar!/:4.5.12] at
> >> >
> >> >
> >> org.springframework.http.client.HttpComponentsClientHttpRequest.executeInternal(HttpComponentsClientHttpRequest.java:87)
> >> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
> >> >
> >> >
> >> org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48)
> >> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
> >> >
> >> >
> >> org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53)
> >> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
> >> >
> >> >
> >> org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:109)
> >> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
> >> >
> >> >
> >> org.springframework.boot.actuate.metrics.web.client.MetricsClientHttpRequestInterceptor.intercept(MetricsClientHttpRequestInterceptor.java:95)
> >> >    ~[spring-boot-actuator-2.2.11.RELEASE.jar!/:2.2.11.RELEASE] at
> >> >
> >> >
> >> org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:93)
> >> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
> >> >
> >> >
> >> org.springframework.http.client.InterceptingClientHttpRequest.executeInternal(InterceptingClientHttpRequest.java:77)
> >> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
> >> >
> >> >
> >> org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48)
> >> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
> >> >
> >> >
> >> org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53)
> >> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
> >> >
> >> >
> >> org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:737)
> >> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
> >> >
> >> >
> >> org.springframework.web.client.RestTemplate.execute(RestTemplate.java:672)
> >> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at
> >> >
> >> >
> >> org.springframework.web.client.RestTemplate.postForEntity(RestTemplate.java:447)
> >> >    ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] ... 21 more
> >> 2021-01-31
> >> >    05:48:13.139+0000 INFO Camel (camel-1) thread #7 -
> >> >    KafkaConsumer[testTopicV1]
> >> >    [camel.component.kafka.KafkaConsumer(doStop:141)] Stopping Kafka
> >> > consumer
> >> >    on topic: testTopicV1 2021-01-31 05:48:15.140+0000 INFO Camel
> >> (camel-1)
> >> >    thread #7 - KafkaConsumer[testTopicV1]
> >> >    [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)]
> >> > Waited
> >> >    2.000 seconds for ExecutorService:
> >> >
> >> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0
> >> > [Shutting
> >> >    down, pool size = 2, active threads = 2, queued tasks = 0, completed
> >> > tasks
> >> >    = 0][KafkaConsumer[testTopicV1]] to terminate... 2021-01-31
> >> >    05:48:17.141+0000 INFO Camel (camel-1) thread #7 -
> >> >    KafkaConsumer[testTopicV1]
> >> >    [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)]
> >> > Waited
> >> >    4.001 seconds for ExecutorService:
> >> >
> >> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0
> >> > [Shutting
> >> >    down, pool size = 2, active threads = 2, queued tasks = 0, completed
> >> > tasks
> >> >    = 0][KafkaConsumer[testTopicV1]] to terminate... 2021-01-31
> >> >    05:48:19.142+0000 INFO Camel (camel-1) thread #7 -
> >> >    KafkaConsumer[testTopicV1]
> >> >    [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)]
> >> > Waited
> >> >    6.001 seconds for ExecutorService:
> >> >
> >> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0
> >> > [Shutting
> >> >    down, pool size = 2, active threads = 2, queued tasks = 0, completed
> >> > tasks
> >> >    = 0][KafkaConsumer[testTopicV1]] to terminate... 2021-01-31
> >> >    05:48:21.147+0000 INFO Camel (camel-1) thread #7 -
> >> >    KafkaConsumer[testTopicV1]
> >> >    [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)]
> >> > Waited
> >> >    8.007 seconds for ExecutorService:
> >> >
> >> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0
> >> > [Shutting
> >> >    down, pool size = 2, active threads = 2, queued tasks = 0, completed
> >> > tasks
> >> >    = 0][KafkaConsumer[testTopicV1]] to terminate... 2021-01-31
> >> >    05:48:23.141+0000 INFO Camel (camel-1) thread #7 -
> >> >    KafkaConsumer[testTopicV1]
> >> >    [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)]
> >> > Waited
> >> >    10.000 seconds for ExecutorService:
> >> >
> >> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0
> >> > [Shutting
> >> >    down, pool size = 2, active threads = 2, queued tasks = 0, completed
> >> > tasks
> >> >    = 0][KafkaConsumer[testTopicV1]] to terminate... 2021-01-31
> >> >    05:48:23.141+0000 WARN Camel (camel-1) thread #7 -
> >> >    KafkaConsumer[testTopicV1]
> >> >    [camel.impl.engine.BaseExecutorServiceManager(doShutdown:305)]
> >> Forcing
> >> >    shutdown of ExecutorService:
> >> >
> >> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0
> >> > [Shutting
> >> >    down, pool size = 2, active threads = 2, queued tasks = 0, completed
> >> > tasks
> >> >    = 0][KafkaConsumer[testTopicV1]] due first await termination elapsed.
> >> >    2021-01-31 05:48:23.141+0000 WARN Camel (camel-1) thread #7 -
> >> >    KafkaConsumer[testTopicV1]
> >> >    [camel.impl.engine.BaseExecutorServiceManager(doShutdown:314)]
> >> Forcing
> >> >    shutdown of ExecutorService:
> >> >
> >> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0
> >> > [Shutting
> >> >    down, pool size = 2, active threads = 2, queued tasks = 0, completed
> >> > tasks
> >> >    = 0][KafkaConsumer[testTopicV1]] due interrupted. 2021-01-31
> >> >    05:48:23.141+0000 INFO Camel (camel-1) thread #7 -
> >> >    KafkaConsumer[testTopicV1]
> >> >    [camel.impl.engine.BaseExecutorServiceManager(doShutdown:322)]
> >> Shutdown
> >> > of
> >> >    ExecutorService:
> >> >
> >> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0
> >> > [Shutting
> >> >    down, pool size = 2, active threads = 2, queued tasks = 0, completed
> >> > tasks
> >> >    = 0][KafkaConsumer[testTopicV1]] is shutdown: true and terminated:
> >> false
> >> >    took: 10.001 seconds. 2021-01-31 05:48:23.142+0000 INFO Camel
> >> (camel-1)
> >> >    thread #6 - KafkaConsumer[testTopicV1]
> >> >    [camel.component.kafka.KafkaConsumer(doRun:397)] Unsubscribing
> >> >    testTopicV1-Thread 0 from topic testTopicV1 2021-01-31
> >> 05:48:23.143+0000
> >> >    INFO Camel (camel-1) thread #6 - KafkaConsumer[testTopicV1]
> >> >
> >> >
> >> [clients.consumer.internals.ConsumerCoordinator(invokePartitionsRevoked:286)]
> >> >    [Consumer clientId=consumer-TestService-5, groupId=TestService]
> >> Revoke
> >> >    previously assigned partitions testTopicV1-13, testTopicV1-12,
> >> >    testTopicV1-14, testTopicV1-11, testTopicV1-10 2021-01-31
> >> > 05:48:23.142+0000
> >> >    WARN Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1]
> >> >    [camel.component.kafka.KafkaConsumer(log:212)] Error during
> >> processing.
> >> >    Exchange[ID-TestId-1611647443253-0-25]. Caused by:
> >> >    [ServerUnavailableException - RestClientException during rest call]
> >> >    ServerUnavailableException: RestClientException during rest call
> >> > 2021-01-31
> >> >    05:48:23.143+0000 INFO kafka-coordinator-heartbeat-thread |
> >> TestService
> >> >
> >> >
> >> [clients.consumer.internals.AbstractCoordinator(markCoordinatorUnknown:808)]
> >> >    [Consumer clientId=consumer-TestService-5, groupId=TestService] Group
> >> >    coordinator KafkaBroker3:9093 (id: 2147482641 rack: null) is
> >> > unavailable or
> >> >    invalid, will attempt rediscovery 2021-01-31 05:48:23.143+0000 WARN
> >> > Camel
> >> >    (camel-1) thread #6 - KafkaConsumer[testTopicV1]
> >> >    [camel.component.kafka.KafkaConsumer(log:212)] Error unsubscribing
> >> >    testTopicV1-Thread 0 from kafka topic testTopicV1. Caused by:
> >> >    [org.apache.kafka.common.errors.WakeupException - null] 2021-01-31
> >> >    05:48:23.143+0000 INFO Camel (camel-1) thread #7 -
> >> >    KafkaConsumer[testTopicV1]
> >> > [camel.component.kafka.KafkaConsumer(doRun:397)]
> >> >    Unsubscribing testTopicV1-Thread 1 from topic testTopicV1
> >> >    org.apache.kafka.common.errors.WakeupException: null at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:937)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1473)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1431)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.commitOffset(KafkaConsumer.java:432)
> >> >    ~[camel-kafka-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.onPartitionsRevoked(KafkaConsumer.java:455)
> >> >    ~[camel-kafka-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1073)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:400)
> >> >    [camel-kafka-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:214)
> >> >    [camel-kafka-3.2.0.jar!/:3.2.0] at
> >> >    java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> >> >    [?:1.8.0_271] at java.util.concurrent.FutureTask.run(Unknown Source)
> >> >    [?:1.8.0_271] at
> >> > java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> >> >    Source) [?:1.8.0_271] at
> >> >    java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> >> >    [?:1.8.0_271] at java.lang.Thread.run(Unknown Source) [?:1.8.0_271]
> >> >    2021-01-31 05:48:23.143+0000 INFO Camel (camel-1) thread #7 -
> >> >    KafkaConsumer[testTopicV1]
> >> >
> >> >
> >> [clients.consumer.internals.ConsumerCoordinator(invokePartitionsRevoked:286)]
> >> >    [Consumer clientId=consumer-TestService-6, groupId=TestService]
> >> Revoke
> >> >    previously assigned partitions testTopicV1-24, testTopicV1-21,
> >> >    testTopicV1-20, testTopicV1-23, testTopicV1-22 2021-01-31
> >> > 05:48:23.143+0000
> >> >    WARN Camel (camel-1) thread #6 - KafkaConsumer[testTopicV1]
> >> >
> >> >
> >> [clients.consumer.internals.AbstractCoordinator(closeHeartbeatThread:367)]
> >> >    [Consumer clientId=consumer-TestService-5, groupId=TestService]
> >> > Interrupted
> >> >    while waiting for consumer heartbeat thread to close 2021-01-31
> >> >    05:48:23.144+0000 INFO Camel (camel-1) thread #6 -
> >> >    KafkaConsumer[testTopicV1]
> >> >
> >> >
> >> [clients.consumer.internals.ConsumerCoordinator(invokePartitionsRevoked:286)]
> >> >    [Consumer clientId=consumer-TestService-5, groupId=TestService]
> >> Revoke
> >> >    previously assigned partitions testTopicV1-13, testTopicV1-12,
> >> >    testTopicV1-14, testTopicV1-11, testTopicV1-10 2021-01-31
> >> > 05:48:23.144+0000
> >> >    WARN Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1]
> >> >    [camel.component.kafka.KafkaConsumer(log:212)] Error unsubscribing
> >> >    testTopicV1-Thread 1 from kafka topic testTopicV1. Caused by:
> >> >    [org.apache.kafka.common.errors.WakeupException - null]
> >> >    org.apache.kafka.common.errors.WakeupException: null .... 2021-01-31
> >> >    05:48:23.144+0000 WARN Camel (camel-1) thread #7 -
> >> >    KafkaConsumer[testTopicV1]
> >> >
> >> >
> >> [clients.consumer.internals.AbstractCoordinator(closeHeartbeatThread:367)]
> >> >    [Consumer clientId=consumer-TestService-6, groupId=TestService]
> >> > Interrupted
> >> >    while waiting for consumer heartbeat thread to close 2021-01-31
> >> >    05:48:23.144+0000 INFO Camel (camel-1) thread #7 -
> >> >    KafkaConsumer[testTopicV1]
> >> >
> >> >
> >> [clients.consumer.internals.ConsumerCoordinator(invokePartitionsRevoked:286)]
> >> >    [Consumer clientId=consumer-TestService-6, groupId=TestService]
> >> Revoke
> >> >    previously assigned partitions testTopicV1-24, testTopicV1-21,
> >> >    testTopicV1-20, testTopicV1-23, testTopicV1-22 2021-01-31
> >> > 05:48:23.144+0000
> >> >    ERROR Camel (camel-1) thread #6 - KafkaConsumer[testTopicV1]
> >> >    [kafka.clients.consumer.KafkaConsumer(close:2297)] [Consumer
> >> >    clientId=consumer-TestService-5, groupId=TestService] Failed to close
> >> >    coordinator org.apache.kafka.common.errors.InterruptException:
> >> >    java.lang.InterruptedException at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:517)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:278)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:932)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1473)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1431)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.commitOffset(KafkaConsumer.java:432)
> >> >    ~[camel-kafka-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.onPartitionsRevoked(KafkaConsumer.java:455)
> >> >    ~[camel-kafka-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:887)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:832)
> >> >    ~[kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2294)
> >> >    [kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2261)
> >> >    [kafka-clients-2.4.0.jar!/:?] at
> >> >
> >> >
> >> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2211)
> >> >    [kafka-clients-2.4.0.jar!/:?] at
> >> >    org.apache.camel.util.IOHelper.close(IOHelper.java:341)
> >> >    [camel-util-3.2.0.jar!/:3.2.0] at
> >> >    org.apache.camel.util.IOHelper.close(IOHelper.java:403)
> >> >    [camel-util-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:419)
> >> >    [camel-kafka-3.2.0.jar!/:3.2.0] at
> >> >
> >> >
> >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:214)
> >> >    [camel-kafka-3.2.0.jar!/:3.2.0] at
> >> >    java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> >> >    [?:1.8.0_271] at java.util.concurrent.FutureTask.run(Unknown Source)
> >> >    [?:1.8.0_271] at
> >> > java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> >> >    Source) [?:1.8.0_271] at
> >> >    java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> >> >    [?:1.8.0_271] at java.lang.Thread.run(Unknown Source) [?:1.8.0_271]
> >> > Caused
> >> >    by: java.lang.InterruptedException ... 26 more *
> >> >
> >> > *-RegardsSrikant Mantha*
> >> >
> >>
> >



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2